vortex_layout/layouts/flat/
writer.rs1use vortex_array::serde::SerializeOptions;
2use vortex_array::stats::{Precision, Stat, StatsProvider};
3use vortex_array::{Array, ArrayContext, ArrayRef};
4use vortex_dtype::DType;
5use vortex_error::{VortexResult, vortex_bail, vortex_err};
6use vortex_scalar::{BinaryScalar, Utf8Scalar};
7
8use crate::layouts::flat::FlatLayout;
9use crate::layouts::zoned::{lower_bound, upper_bound};
10use crate::segments::SegmentWriter;
11use crate::writer::LayoutWriter;
12use crate::{IntoLayout, LayoutRef, LayoutStrategy, LayoutWriterExt};
13
14#[derive(Clone)]
15pub struct FlatLayoutStrategy {
16 pub include_padding: bool,
18 pub max_variable_length_statistics_size: usize,
20}
21
22impl Default for FlatLayoutStrategy {
23 fn default() -> Self {
24 Self {
25 include_padding: true,
26 max_variable_length_statistics_size: 64,
27 }
28 }
29}
30
31impl LayoutStrategy for FlatLayoutStrategy {
32 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
33 Ok(FlatLayoutWriter::new(ctx.clone(), dtype.clone(), self.clone()).boxed())
34 }
35}
36
37pub struct FlatLayoutWriter {
39 ctx: ArrayContext,
40 dtype: DType,
41 options: FlatLayoutStrategy,
42 layout: Option<LayoutRef>,
43}
44
45impl FlatLayoutWriter {
46 pub fn new(ctx: ArrayContext, dtype: DType, options: FlatLayoutStrategy) -> Self {
47 Self {
48 ctx,
49 dtype,
50 options,
51 layout: None,
52 }
53 }
54}
55
56impl LayoutWriter for FlatLayoutWriter {
57 fn push_chunk(
58 &mut self,
59 segment_writer: &mut dyn SegmentWriter,
60 chunk: ArrayRef,
61 ) -> VortexResult<()> {
62 assert_eq!(
63 chunk.dtype(),
64 &self.dtype,
65 "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
66 chunk.dtype(),
67 self.dtype
68 );
69
70 if self.layout.is_some() {
71 vortex_bail!("FlatLayoutStrategy::push_batch called after finish");
72 }
73 let row_count = chunk.len() as u64;
74
75 match chunk.dtype() {
76 DType::Utf8(_) => {
77 if let Some(sv) = chunk.statistics().get(Stat::Min) {
78 let (value, truncated) = lower_bound::<Utf8Scalar>(
79 chunk.dtype(),
80 sv.into_inner(),
81 self.options.max_variable_length_statistics_size,
82 )?;
83 if truncated {
84 chunk.statistics().set(Stat::Min, Precision::Inexact(value));
85 }
86 }
87
88 if let Some(sv) = chunk.statistics().get(Stat::Max) {
89 let (value, truncated) = upper_bound::<Utf8Scalar>(
90 chunk.dtype(),
91 sv.into_inner(),
92 self.options.max_variable_length_statistics_size,
93 )?;
94 if let Some(upper_bound) = value {
95 if truncated {
96 chunk
97 .statistics()
98 .set(Stat::Max, Precision::Inexact(upper_bound));
99 }
100 } else {
101 chunk.statistics().clear(Stat::Max)
102 }
103 }
104 }
105 DType::Binary(_) => {
106 if let Some(sv) = chunk.statistics().get(Stat::Min) {
107 let (value, truncated) = lower_bound::<BinaryScalar>(
108 chunk.dtype(),
109 sv.into_inner(),
110 self.options.max_variable_length_statistics_size,
111 )?;
112 if truncated {
113 chunk.statistics().set(Stat::Min, Precision::Inexact(value));
114 }
115 }
116
117 if let Some(sv) = chunk.statistics().get(Stat::Max) {
118 let (value, truncated) = upper_bound::<BinaryScalar>(
119 chunk.dtype(),
120 sv.into_inner(),
121 self.options.max_variable_length_statistics_size,
122 )?;
123 if let Some(upper_bound) = value {
124 if truncated {
125 chunk
126 .statistics()
127 .set(Stat::Max, Precision::Inexact(upper_bound));
128 }
129 } else {
130 chunk.statistics().clear(Stat::Max)
131 }
132 }
133 }
134 _ => {}
135 }
136
137 let buffers = chunk.serialize(
138 &self.ctx,
139 &SerializeOptions {
140 offset: 0,
141 include_padding: self.options.include_padding,
142 },
143 )?;
144 let segment_id = segment_writer.put(&buffers);
145
146 self.layout =
147 Some(FlatLayout::new(row_count, self.dtype.clone(), segment_id).into_layout());
148
149 Ok(())
150 }
151
152 fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
153 Ok(())
154 }
155
156 fn finish(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
157 self.layout
158 .take()
159 .ok_or_else(|| vortex_err!("FlatLayoutStrategy::finish called without push_batch"))
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use std::sync::Arc;
166
167 use futures::executor::block_on;
168 use vortex_array::arrays::PrimitiveArray;
169 use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
170 use vortex_array::stats::{Precision, Stat};
171 use vortex_array::validity::Validity;
172 use vortex_array::{Array, ArrayContext};
173 use vortex_buffer::buffer;
174 use vortex_dtype::{DType, Nullability};
175 use vortex_error::VortexUnwrap;
176 use vortex_expr::ident;
177 use vortex_mask::Mask;
178
179 use crate::layouts::flat::writer::FlatLayoutWriter;
180 use crate::segments::{SegmentSource, TestSegments};
181 use crate::writer::LayoutWriterExt;
182
183 #[should_panic]
186 #[test]
187 fn flat_stats() {
188 block_on(async {
189 let ctx = ArrayContext::empty();
190 let mut segments = TestSegments::default();
191 let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
192 let layout =
193 FlatLayoutWriter::new(ctx.clone(), array.dtype().clone(), Default::default())
194 .push_one(&mut segments, array.to_array())
195 .unwrap();
196 let segments: Arc<dyn SegmentSource> = Arc::new(segments);
197
198 let result = layout
199 .new_reader(&"".into(), &segments, &ctx)
200 .unwrap()
201 .projection_evaluation(&(0..layout.row_count()), &ident())
202 .unwrap()
203 .invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
204 .await
205 .unwrap();
206
207 assert_eq!(
208 result.statistics().get_as::<bool>(Stat::IsSorted),
209 Some(Precision::Exact(true))
210 );
211 })
212 }
213
214 #[test]
215 fn truncates_variable_size_stats() {
216 block_on(async {
217 let ctx = ArrayContext::empty();
218 let mut segments = TestSegments::default();
219 let mut builder =
220 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 2);
221 builder.append_value("Long value to test that the statistics are actually truncated, it needs a bit of extra padding though");
222 builder.append_value("Another string that's meant to be smaller than the previous value, though still need extra padding");
223 let array = builder.finish();
224 array.statistics().set_iter(
225 array
226 .statistics()
227 .compute_all(&Stat::all().collect::<Vec<_>>())
228 .vortex_unwrap()
229 .into_iter(),
230 );
231
232 let layout =
233 FlatLayoutWriter::new(ctx.clone(), array.dtype().clone(), Default::default())
234 .push_one(&mut segments, array.to_array())
235 .unwrap();
236 let segments: Arc<dyn SegmentSource> = Arc::new(segments);
237
238 let result = layout
239 .new_reader(&"".into(), &segments, &ctx)
240 .unwrap()
241 .projection_evaluation(&(0..layout.row_count()), &ident())
242 .unwrap()
243 .invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
244 .await
245 .unwrap();
246
247 assert_eq!(
248 result.statistics().get_as::<String>(Stat::Min),
249 Some(Precision::Inexact(
250 "Another string that's meant to be smaller than the previous valu".to_string()
251 ))
252 );
253 assert_eq!(
254 result.statistics().get_as::<String>(Stat::Max),
255 Some(Precision::Inexact(
256 "Long value to test that the statistics are actually truncated, j".to_string()
257 ))
258 );
259 })
260 }
261}