vortex_layout/layouts/flat/
writer.rs1use vortex_array::serde::SerializeOptions;
2use vortex_array::stats::{STATS_TO_WRITE, Stat};
3use vortex_array::{Array, ArrayContext, ArrayRef};
4use vortex_dtype::DType;
5use vortex_error::{VortexResult, vortex_bail, vortex_err};
6
7use crate::layouts::flat::FlatLayout;
8use crate::segments::SegmentWriter;
9use crate::writer::LayoutWriter;
10use crate::{Layout, LayoutStrategy, LayoutVTableRef, LayoutWriterExt};
11
12#[derive(Clone)]
13pub struct FlatLayoutStrategy {
14 pub array_stats: Vec<Stat>,
16 pub include_padding: bool,
18}
19
20impl Default for FlatLayoutStrategy {
21 fn default() -> Self {
22 Self {
23 array_stats: STATS_TO_WRITE.to_vec(),
24 include_padding: true,
25 }
26 }
27}
28
29impl LayoutStrategy for FlatLayoutStrategy {
30 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
31 Ok(FlatLayoutWriter::new(ctx.clone(), dtype.clone(), self.clone()).boxed())
32 }
33}
34
35pub struct FlatLayoutWriter {
37 ctx: ArrayContext,
38 dtype: DType,
39 options: FlatLayoutStrategy,
40 layout: Option<Layout>,
41}
42
43impl FlatLayoutWriter {
44 pub fn new(ctx: ArrayContext, dtype: DType, options: FlatLayoutStrategy) -> Self {
45 Self {
46 ctx,
47 dtype,
48 options,
49 layout: None,
50 }
51 }
52}
53
54fn update_stats(array: &dyn Array, stats: &[Stat]) -> VortexResult<()> {
55 array.statistics().retain(stats);
58 for child in array.children() {
59 update_stats(&child, stats)?
60 }
61 Ok(())
62}
63
64impl LayoutWriter for FlatLayoutWriter {
65 fn push_chunk(
66 &mut self,
67 segment_writer: &mut dyn SegmentWriter,
68 chunk: ArrayRef,
69 ) -> VortexResult<()> {
70 assert_eq!(
71 chunk.dtype(),
72 &self.dtype,
73 "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
74 chunk.dtype(),
75 self.dtype
76 );
77
78 if self.layout.is_some() {
79 vortex_bail!("FlatLayoutStrategy::push_batch called after finish");
80 }
81 let row_count = chunk.len() as u64;
82 update_stats(&chunk, &self.options.array_stats)?;
83
84 let buffers = chunk.serialize(
85 &self.ctx,
86 &SerializeOptions {
87 offset: 0,
88 include_padding: self.options.include_padding,
89 },
90 )?;
91 let segment_id = segment_writer.put(&buffers);
92
93 self.layout = Some(Layout::new_owned(
94 "flat".into(),
95 LayoutVTableRef::new_ref(&FlatLayout),
96 self.dtype.clone(),
97 row_count,
98 vec![segment_id],
99 vec![],
100 None,
101 ));
102 Ok(())
103 }
104
105 fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
106 Ok(())
107 }
108
109 fn finish(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
110 self.layout
111 .take()
112 .ok_or_else(|| vortex_err!("FlatLayoutStrategy::finish called without push_batch"))
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use std::sync::Arc;
119
120 use futures::executor::block_on;
121 use vortex_array::arrays::PrimitiveArray;
122 use vortex_array::stats::{Precision, Stat};
123 use vortex_array::validity::Validity;
124 use vortex_array::{Array, ArrayContext};
125 use vortex_buffer::buffer;
126 use vortex_expr::ident;
127 use vortex_mask::Mask;
128
129 use crate::ExprEvaluator;
130 use crate::layouts::flat::writer::FlatLayoutWriter;
131 use crate::segments::{SegmentSource, TestSegments};
132 use crate::writer::LayoutWriterExt;
133
134 #[should_panic]
137 #[test]
138 fn flat_stats() {
139 block_on(async {
140 let ctx = ArrayContext::empty();
141 let mut segments = TestSegments::default();
142 let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
143 let layout =
144 FlatLayoutWriter::new(ctx.clone(), array.dtype().clone(), Default::default())
145 .push_one(&mut segments, array.to_array())
146 .unwrap();
147 let segments: Arc<dyn SegmentSource> = Arc::new(segments);
148
149 let result = layout
150 .reader(&segments, &ctx)
151 .unwrap()
152 .projection_evaluation(&(0..layout.row_count()), &ident())
153 .unwrap()
154 .invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
155 .await
156 .unwrap();
157
158 assert_eq!(
159 result.statistics().get_as::<bool>(Stat::IsSorted),
160 Some(Precision::Exact(true))
161 );
162 })
163 }
164}