1use std::collections::VecDeque;
4use std::sync::Arc;
5
6use arcref::ArcRef;
7use itertools::Itertools;
8use vortex_array::arrays::ConstantArray;
9use vortex_array::stats::{PRUNING_STATS, Stat};
10use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
11use vortex_btrblocks::BtrBlocksCompressor;
12use vortex_dtype::DType;
13use vortex_error::VortexResult;
14use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
15use vortex_layout::layouts::dict::writer::DictStrategy;
16use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
17use vortex_layout::layouts::repartition::{
18 RepartitionStrategy, RepartitionWriter, RepartitionWriterOptions,
19};
20use vortex_layout::layouts::struct_::writer::StructLayoutWriter;
21use vortex_layout::layouts::zoned::writer::{ZonedLayoutOptions, ZonedLayoutWriter};
22use vortex_layout::segments::SegmentWriter;
23use vortex_layout::{LayoutRef, LayoutStrategy, LayoutWriter, LayoutWriterExt};
24
25const ROW_BLOCK_SIZE: usize = 8192;
26
27#[derive(Clone, Debug, Default)]
29pub struct VortexLayoutStrategy;
30
31impl LayoutStrategy for VortexLayoutStrategy {
32 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
33 if dtype.is_struct() {
35 return Ok(StructLayoutWriter::try_new_with_strategy(ctx, dtype, self)?.boxed());
36 }
37
38 let buffered_strategy: ArcRef<dyn LayoutStrategy> =
41 ArcRef::new_arc(Arc::new(BufferedStrategy {
42 child: ArcRef::new_arc(Arc::new(ChunkedLayoutStrategy::default()) as _),
43 buffer_size: 2 << 20, }) as _);
47
48 let coalescing_strategy = Arc::new(RepartitionStrategy {
50 options: RepartitionWriterOptions {
51 block_size_minimum: 1 << 20, block_len_multiple: ROW_BLOCK_SIZE, },
54 child: ArcRef::new_arc(Arc::new(BtrBlocksCompressedStrategy {
55 child: buffered_strategy,
56 })),
57 });
58
59 let dict_strategy = DictStrategy {
60 codes: ArcRef::new_arc(coalescing_strategy.clone()),
61 values: ArcRef::new_arc(Arc::new(BtrBlocksCompressedStrategy {
62 child: ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())),
63 })),
64 fallback: ArcRef::new_arc(coalescing_strategy),
65 options: Default::default(),
66 };
67
68 let writer = dict_strategy.new_writer(ctx, dtype)?;
69
70 let zoned_writer = ZonedLayoutWriter::new(
72 ctx.clone(),
73 dtype,
74 writer,
75 ArcRef::new_arc(Arc::new(BtrBlocksCompressedStrategy {
76 child: ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())),
77 })),
78 ZonedLayoutOptions {
79 block_size: ROW_BLOCK_SIZE,
80 stats: PRUNING_STATS.into(),
81 max_variable_length_statistics_size: 64,
82 },
83 )
84 .boxed();
85
86 let writer = RepartitionWriter::new(
87 dtype.clone(),
88 zoned_writer,
89 RepartitionWriterOptions {
90 block_size_minimum: 0,
92 block_len_multiple: ROW_BLOCK_SIZE,
94 },
95 )
96 .boxed();
97
98 Ok(writer)
99 }
100}
101
102struct BtrBlocksCompressedStrategy {
103 child: ArcRef<dyn LayoutStrategy>,
104}
105
106impl LayoutStrategy for BtrBlocksCompressedStrategy {
107 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
108 let child = self.child.new_writer(ctx, dtype)?;
109 Ok(BtrBlocksCompressedWriter {
110 child,
111 previous_chunk: None,
112 }
113 .boxed())
114 }
115}
116
117struct PreviousCompression {
118 chunk: ArrayRef,
119 ratio: f64,
120}
121
122const COMPRESSION_DRIFT_THRESHOLD: f64 = 1.2;
123
124struct BtrBlocksCompressedWriter {
127 child: Box<dyn LayoutWriter>,
128 previous_chunk: Option<PreviousCompression>,
129}
130
131impl LayoutWriter for BtrBlocksCompressedWriter {
132 fn push_chunk(
133 &mut self,
134 segment_writer: &mut dyn SegmentWriter,
135 chunk: ArrayRef,
136 ) -> VortexResult<()> {
137 let chunk = chunk.to_canonical()?.into_array();
138
139 chunk
141 .statistics()
142 .compute_all(&Stat::all().collect::<Vec<_>>())?;
143
144 let compressed_chunk = if let Some(constant) = chunk.as_constant() {
146 Some(ConstantArray::new(constant, chunk.len()).into_array())
147 } else if let Some(prev_compression) = self.previous_chunk.as_ref() {
148 let prev_chunk = prev_compression.chunk.clone();
149
150 let canonical_nbytes = chunk.as_ref().nbytes();
151
152 if let Some(encoded_chunk) = encode_children_like(chunk.clone(), prev_chunk)? {
153 let ratio = canonical_nbytes as f64 / encoded_chunk.nbytes() as f64;
154
155 if ratio > (prev_compression.ratio / COMPRESSION_DRIFT_THRESHOLD) {
157 Some(encoded_chunk)
158 } else {
159 log::trace!(
160 "Compressed to a ratio of {ratio}, which is below the threshold of {}",
161 prev_compression.ratio / COMPRESSION_DRIFT_THRESHOLD
162 );
163 None
164 }
165 } else {
166 log::debug!("Couldn't re-encode children");
167
168 None
169 }
170 } else {
171 None
172 };
173
174 let compressed_chunk = match compressed_chunk {
175 Some(array) => array,
176 None => {
177 let canonical_size = chunk.nbytes() as f64;
178 let compressed = BtrBlocksCompressor.compress(&chunk)?;
179
180 if compressed.is_canonical()
181 || ((canonical_size / compressed.nbytes() as f64) < COMPRESSION_DRIFT_THRESHOLD)
182 {
183 self.previous_chunk = None;
184 } else {
185 self.previous_chunk = Some(PreviousCompression {
186 chunk: compressed.clone(),
187 ratio: canonical_size / compressed.nbytes() as f64,
188 });
189 }
190
191 compressed
192 }
193 };
194
195 compressed_chunk.statistics().inherit(chunk.statistics());
196
197 self.child.push_chunk(segment_writer, compressed_chunk)
198 }
199
200 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
201 self.child.flush(segment_writer)
202 }
203
204 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
205 self.child.finish(segment_writer)
206 }
207}
208
209struct BufferedStrategy {
210 child: ArcRef<dyn LayoutStrategy>,
211 buffer_size: u64,
212}
213
214impl LayoutStrategy for BufferedStrategy {
215 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
216 let child = self.child.new_writer(ctx, dtype)?;
217 Ok(BufferedWriter {
218 chunks: Default::default(),
219 nbytes: 0,
220 buffer_size: self.buffer_size,
221 child,
222 }
223 .boxed())
224 }
225}
226
227struct BufferedWriter {
228 chunks: VecDeque<ArrayRef>,
229 nbytes: u64,
230 buffer_size: u64,
231 child: Box<dyn LayoutWriter>,
232}
233
234impl LayoutWriter for BufferedWriter {
235 fn push_chunk(
236 &mut self,
237 segment_writer: &mut dyn SegmentWriter,
238 chunk: ArrayRef,
239 ) -> VortexResult<()> {
240 self.nbytes += chunk.nbytes() as u64;
241 self.chunks.push_back(chunk);
242 if self.nbytes >= 2 * self.buffer_size {
245 while self.nbytes > self.buffer_size {
246 if let Some(chunk) = self.chunks.pop_front() {
247 self.nbytes -= chunk.nbytes() as u64;
248 self.child.push_chunk(segment_writer, chunk)?;
249 } else {
250 break;
251 }
252 }
253 }
254 Ok(())
255 }
256
257 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
258 for chunk in self.chunks.drain(..) {
259 self.child.push_chunk(segment_writer, chunk)?;
260 }
261 self.child.flush(segment_writer)
262 }
263
264 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
265 self.child.finish(segment_writer)
266 }
267}
268
269fn encode_children_like(current: ArrayRef, previous: ArrayRef) -> VortexResult<Option<ArrayRef>> {
270 if let Some(constant) = current.as_constant() {
271 Ok(Some(
272 ConstantArray::new(constant, current.len()).into_array(),
273 ))
274 } else if let Some(encoded) = previous
275 .encoding()
276 .encode(¤t.to_canonical()?, Some(&previous))?
277 {
278 let previous_children = previous.children();
279 let encoded_children = encoded.children();
280
281 if previous_children.len() != encoded_children.len() {
282 log::trace!(
283 "Children count mismatch {} and {}",
284 previous_children.len(),
285 encoded_children.len()
286 );
287 return Ok(Some(encoded));
288 }
289
290 let mut new_children: Vec<Arc<dyn Array>> = Vec::with_capacity(encoded_children.len());
291
292 for (p, e) in previous_children
293 .into_iter()
294 .zip_eq(encoded_children.into_iter())
295 {
296 new_children.push(encode_children_like(e.clone(), p)?.unwrap_or(e));
297 }
298
299 Ok(Some(encoded.with_children(&new_children)?))
300 } else {
301 Ok(None)
302 }
303}