vortex_layout/layouts/flat/
writer.rs

1use vortex_array::serde::SerializeOptions;
2use vortex_array::stats::{STATS_TO_WRITE, Stat};
3use vortex_array::{Array, ArrayContext, ArrayRef};
4use vortex_dtype::DType;
5use vortex_error::{VortexResult, vortex_bail, vortex_err};
6
7use crate::layouts::flat::FlatLayout;
8use crate::segments::SegmentWriter;
9use crate::writer::LayoutWriter;
10use crate::{Layout, LayoutStrategy, LayoutVTableRef, LayoutWriterExt};
11
12#[derive(Clone)]
13pub struct FlatLayoutStrategy {
14    /// Stats to preserve when writing arrays
15    pub array_stats: Vec<Stat>,
16    /// Whether to include padding for memory-mapped reads.
17    pub include_padding: bool,
18}
19
20impl Default for FlatLayoutStrategy {
21    fn default() -> Self {
22        Self {
23            array_stats: STATS_TO_WRITE.to_vec(),
24            include_padding: true,
25        }
26    }
27}
28
29impl LayoutStrategy for FlatLayoutStrategy {
30    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
31        Ok(FlatLayoutWriter::new(ctx.clone(), dtype.clone(), self.clone()).boxed())
32    }
33}
34
35/// Writer for a [`FlatLayout`].
36pub struct FlatLayoutWriter {
37    ctx: ArrayContext,
38    dtype: DType,
39    options: FlatLayoutStrategy,
40    layout: Option<Layout>,
41}
42
43impl FlatLayoutWriter {
44    pub fn new(ctx: ArrayContext, dtype: DType, options: FlatLayoutStrategy) -> Self {
45        Self {
46            ctx,
47            dtype,
48            options,
49            layout: None,
50        }
51    }
52}
53
54fn update_stats(array: &dyn Array, stats: &[Stat]) -> VortexResult<()> {
55    // TODO(ngates): consider whether we want to do this
56    // array.statistics().compute_all(stats)?;
57    array.statistics().retain(stats);
58    for child in array.children() {
59        update_stats(&child, stats)?
60    }
61    Ok(())
62}
63
64impl LayoutWriter for FlatLayoutWriter {
65    fn push_chunk(
66        &mut self,
67        segment_writer: &mut dyn SegmentWriter,
68        chunk: ArrayRef,
69    ) -> VortexResult<()> {
70        assert_eq!(
71            chunk.dtype(),
72            &self.dtype,
73            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
74            chunk.dtype(),
75            self.dtype
76        );
77
78        if self.layout.is_some() {
79            vortex_bail!("FlatLayoutStrategy::push_batch called after finish");
80        }
81        let row_count = chunk.len() as u64;
82        update_stats(&chunk, &self.options.array_stats)?;
83
84        let buffers = chunk.serialize(
85            &self.ctx,
86            &SerializeOptions {
87                offset: 0,
88                include_padding: self.options.include_padding,
89            },
90        )?;
91        let segment_id = segment_writer.put(&buffers);
92
93        self.layout = Some(Layout::new_owned(
94            "flat".into(),
95            LayoutVTableRef::new_ref(&FlatLayout),
96            self.dtype.clone(),
97            row_count,
98            vec![segment_id],
99            vec![],
100            None,
101        ));
102        Ok(())
103    }
104
105    fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
106        Ok(())
107    }
108
109    fn finish(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
110        self.layout
111            .take()
112            .ok_or_else(|| vortex_err!("FlatLayoutStrategy::finish called without push_batch"))
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use std::sync::Arc;
119
120    use futures::executor::block_on;
121    use vortex_array::arrays::PrimitiveArray;
122    use vortex_array::stats::{Precision, Stat};
123    use vortex_array::validity::Validity;
124    use vortex_array::{Array, ArrayContext};
125    use vortex_buffer::buffer;
126    use vortex_expr::ident;
127    use vortex_mask::Mask;
128
129    use crate::ExprEvaluator;
130    use crate::layouts::flat::writer::FlatLayoutWriter;
131    use crate::segments::{SegmentSource, TestSegments};
132    use crate::writer::LayoutWriterExt;
133
134    // Currently, flat layouts do not force compute stats during write, they only retain
135    // pre-computed stats.
136    #[should_panic]
137    #[test]
138    fn flat_stats() {
139        block_on(async {
140            let ctx = ArrayContext::empty();
141            let mut segments = TestSegments::default();
142            let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
143            let layout =
144                FlatLayoutWriter::new(ctx.clone(), array.dtype().clone(), Default::default())
145                    .push_one(&mut segments, array.to_array())
146                    .unwrap();
147            let segments: Arc<dyn SegmentSource> = Arc::new(segments);
148
149            let result = layout
150                .reader(&segments, &ctx)
151                .unwrap()
152                .projection_evaluation(&(0..layout.row_count()), &ident())
153                .unwrap()
154                .invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
155                .await
156                .unwrap();
157
158            assert_eq!(
159                result.statistics().get_as::<bool>(Stat::IsSorted),
160                Some(Precision::Exact(true))
161            );
162        })
163    }
164}