vortex_layout/layouts/flat/
writer.rs

1use vortex_array::serde::SerializeOptions;
2use vortex_array::stats::{Precision, Stat, StatsProvider};
3use vortex_array::{Array, ArrayContext, ArrayRef};
4use vortex_dtype::DType;
5use vortex_error::{VortexResult, vortex_bail, vortex_err};
6use vortex_scalar::{BinaryScalar, Utf8Scalar};
7
8use crate::layouts::flat::FlatLayout;
9use crate::layouts::zoned::{lower_bound, upper_bound};
10use crate::segments::SegmentWriter;
11use crate::writer::LayoutWriter;
12use crate::{IntoLayout, LayoutRef, LayoutStrategy, LayoutWriterExt};
13
14#[derive(Clone)]
15pub struct FlatLayoutStrategy {
16    /// Whether to include padding for memory-mapped reads.
17    pub include_padding: bool,
18    /// Maximum length of variable length statistics
19    pub max_variable_length_statistics_size: usize,
20}
21
22impl Default for FlatLayoutStrategy {
23    fn default() -> Self {
24        Self {
25            include_padding: true,
26            max_variable_length_statistics_size: 64,
27        }
28    }
29}
30
31impl LayoutStrategy for FlatLayoutStrategy {
32    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
33        Ok(FlatLayoutWriter::new(ctx.clone(), dtype.clone(), self.clone()).boxed())
34    }
35}
36
37/// Writer for a [`FlatLayout`].
38pub struct FlatLayoutWriter {
39    ctx: ArrayContext,
40    dtype: DType,
41    options: FlatLayoutStrategy,
42    layout: Option<LayoutRef>,
43}
44
45impl FlatLayoutWriter {
46    pub fn new(ctx: ArrayContext, dtype: DType, options: FlatLayoutStrategy) -> Self {
47        Self {
48            ctx,
49            dtype,
50            options,
51            layout: None,
52        }
53    }
54}
55
56impl LayoutWriter for FlatLayoutWriter {
57    fn push_chunk(
58        &mut self,
59        segment_writer: &mut dyn SegmentWriter,
60        chunk: ArrayRef,
61    ) -> VortexResult<()> {
62        assert_eq!(
63            chunk.dtype(),
64            &self.dtype,
65            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
66            chunk.dtype(),
67            self.dtype
68        );
69
70        if self.layout.is_some() {
71            vortex_bail!("FlatLayoutStrategy::push_batch called after finish");
72        }
73        let row_count = chunk.len() as u64;
74
75        match chunk.dtype() {
76            DType::Utf8(_) => {
77                if let Some(sv) = chunk.statistics().get(Stat::Min) {
78                    let (value, truncated) = lower_bound::<Utf8Scalar>(
79                        chunk.dtype(),
80                        sv.into_inner(),
81                        self.options.max_variable_length_statistics_size,
82                    )?;
83                    if truncated {
84                        chunk.statistics().set(Stat::Min, Precision::Inexact(value));
85                    }
86                }
87
88                if let Some(sv) = chunk.statistics().get(Stat::Max) {
89                    let (value, truncated) = upper_bound::<Utf8Scalar>(
90                        chunk.dtype(),
91                        sv.into_inner(),
92                        self.options.max_variable_length_statistics_size,
93                    )?;
94                    if let Some(upper_bound) = value {
95                        if truncated {
96                            chunk
97                                .statistics()
98                                .set(Stat::Max, Precision::Inexact(upper_bound));
99                        }
100                    } else {
101                        chunk.statistics().clear(Stat::Max)
102                    }
103                }
104            }
105            DType::Binary(_) => {
106                if let Some(sv) = chunk.statistics().get(Stat::Min) {
107                    let (value, truncated) = lower_bound::<BinaryScalar>(
108                        chunk.dtype(),
109                        sv.into_inner(),
110                        self.options.max_variable_length_statistics_size,
111                    )?;
112                    if truncated {
113                        chunk.statistics().set(Stat::Min, Precision::Inexact(value));
114                    }
115                }
116
117                if let Some(sv) = chunk.statistics().get(Stat::Max) {
118                    let (value, truncated) = upper_bound::<BinaryScalar>(
119                        chunk.dtype(),
120                        sv.into_inner(),
121                        self.options.max_variable_length_statistics_size,
122                    )?;
123                    if let Some(upper_bound) = value {
124                        if truncated {
125                            chunk
126                                .statistics()
127                                .set(Stat::Max, Precision::Inexact(upper_bound));
128                        }
129                    } else {
130                        chunk.statistics().clear(Stat::Max)
131                    }
132                }
133            }
134            _ => {}
135        }
136
137        let buffers = chunk.serialize(
138            &self.ctx,
139            &SerializeOptions {
140                offset: 0,
141                include_padding: self.options.include_padding,
142            },
143        )?;
144        let segment_id = segment_writer.put(&buffers);
145
146        self.layout =
147            Some(FlatLayout::new(row_count, self.dtype.clone(), segment_id).into_layout());
148
149        Ok(())
150    }
151
152    fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
153        Ok(())
154    }
155
156    fn finish(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
157        self.layout
158            .take()
159            .ok_or_else(|| vortex_err!("FlatLayoutStrategy::finish called without push_batch"))
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use std::sync::Arc;
166
167    use futures::executor::block_on;
168    use vortex_array::arrays::PrimitiveArray;
169    use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
170    use vortex_array::stats::{Precision, Stat};
171    use vortex_array::validity::Validity;
172    use vortex_array::{Array, ArrayContext};
173    use vortex_buffer::buffer;
174    use vortex_dtype::{DType, Nullability};
175    use vortex_error::VortexUnwrap;
176    use vortex_expr::ident;
177    use vortex_mask::Mask;
178
179    use crate::layouts::flat::writer::FlatLayoutWriter;
180    use crate::segments::{SegmentSource, TestSegments};
181    use crate::writer::LayoutWriterExt;
182
183    // Currently, flat layouts do not force compute stats during write, they only retain
184    // pre-computed stats.
185    #[should_panic]
186    #[test]
187    fn flat_stats() {
188        block_on(async {
189            let ctx = ArrayContext::empty();
190            let mut segments = TestSegments::default();
191            let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
192            let layout =
193                FlatLayoutWriter::new(ctx.clone(), array.dtype().clone(), Default::default())
194                    .push_one(&mut segments, array.to_array())
195                    .unwrap();
196            let segments: Arc<dyn SegmentSource> = Arc::new(segments);
197
198            let result = layout
199                .new_reader(&"".into(), &segments, &ctx)
200                .unwrap()
201                .projection_evaluation(&(0..layout.row_count()), &ident())
202                .unwrap()
203                .invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
204                .await
205                .unwrap();
206
207            assert_eq!(
208                result.statistics().get_as::<bool>(Stat::IsSorted),
209                Some(Precision::Exact(true))
210            );
211        })
212    }
213
214    #[test]
215    fn truncates_variable_size_stats() {
216        block_on(async {
217            let ctx = ArrayContext::empty();
218            let mut segments = TestSegments::default();
219            let mut builder =
220                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 2);
221            builder.append_value("Long value to test that the statistics are actually truncated, it needs a bit of extra padding though");
222            builder.append_value("Another string that's meant to be smaller than the previous value, though still need extra padding");
223            let array = builder.finish();
224            array.statistics().set_iter(
225                array
226                    .statistics()
227                    .compute_all(&Stat::all().collect::<Vec<_>>())
228                    .vortex_unwrap()
229                    .into_iter(),
230            );
231
232            let layout =
233                FlatLayoutWriter::new(ctx.clone(), array.dtype().clone(), Default::default())
234                    .push_one(&mut segments, array.to_array())
235                    .unwrap();
236            let segments: Arc<dyn SegmentSource> = Arc::new(segments);
237
238            let result = layout
239                .new_reader(&"".into(), &segments, &ctx)
240                .unwrap()
241                .projection_evaluation(&(0..layout.row_count()), &ident())
242                .unwrap()
243                .invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
244                .await
245                .unwrap();
246
247            assert_eq!(
248                result.statistics().get_as::<String>(Stat::Min),
249                Some(Precision::Inexact(
250                    "Another string that's meant to be smaller than the previous valu".to_string()
251                ))
252            );
253            assert_eq!(
254                result.statistics().get_as::<String>(Stat::Max),
255                Some(Precision::Inexact(
256                    "Long value to test that the statistics are actually truncated, j".to_string()
257                ))
258            );
259        })
260    }
261}