vortex_file/
strategy.rs

1//! This module defines the default layout strategy for a Vortex file.
2
3use std::collections::VecDeque;
4use std::sync::Arc;
5
6use arcref::ArcRef;
7use itertools::Itertools;
8use vortex_array::arrays::ConstantArray;
9use vortex_array::stats::{PRUNING_STATS, Stat};
10use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
11use vortex_btrblocks::BtrBlocksCompressor;
12use vortex_dtype::DType;
13use vortex_error::VortexResult;
14use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
15use vortex_layout::layouts::dict::writer::DictStrategy;
16use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
17use vortex_layout::layouts::repartition::{
18    RepartitionStrategy, RepartitionWriter, RepartitionWriterOptions,
19};
20use vortex_layout::layouts::struct_::writer::StructLayoutWriter;
21use vortex_layout::layouts::zoned::writer::{ZonedLayoutOptions, ZonedLayoutWriter};
22use vortex_layout::segments::SegmentWriter;
23use vortex_layout::{LayoutRef, LayoutStrategy, LayoutWriter, LayoutWriterExt};
24
25const ROW_BLOCK_SIZE: usize = 8192;
26
27/// The default Vortex file layout strategy.
28#[derive(Clone, Debug, Default)]
29pub struct VortexLayoutStrategy;
30
31impl LayoutStrategy for VortexLayoutStrategy {
32    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
33        // First, we unwrap struct arrays into their components.
34        if dtype.is_struct() {
35            return Ok(StructLayoutWriter::try_new_with_strategy(ctx, dtype, self)?.boxed());
36        }
37
38        // We buffer arrays per column, before flushing them into a chunked layout.
39        // This helps to keep consecutive chunks of a column adjacent for more efficient reads.
40        let buffered_strategy: ArcRef<dyn LayoutStrategy> =
41            ArcRef::new_arc(Arc::new(BufferedStrategy {
42                child: ArcRef::new_arc(Arc::new(ChunkedLayoutStrategy::default()) as _),
43                // TODO(ngates): this should really be amortized by the number of fields? Maybe the
44                //  strategy could keep track of how many writers were created?
45                buffer_size: 2 << 20, // 2MB
46            }) as _);
47
48        // Prior to compression, re-partition into size-based chunks.
49        let coalescing_strategy = Arc::new(RepartitionStrategy {
50            options: RepartitionWriterOptions {
51                block_size_minimum: 1 << 20,        // 1 MB
52                block_len_multiple: ROW_BLOCK_SIZE, // 8K rows
53            },
54            child: ArcRef::new_arc(Arc::new(BtrBlocksCompressedStrategy {
55                child: buffered_strategy,
56            })),
57        });
58
59        let dict_strategy = DictStrategy {
60            codes: ArcRef::new_arc(coalescing_strategy.clone()),
61            values: ArcRef::new_arc(Arc::new(BtrBlocksCompressedStrategy {
62                child: ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())),
63            })),
64            fallback: ArcRef::new_arc(coalescing_strategy),
65            options: Default::default(),
66        };
67
68        let writer = dict_strategy.new_writer(ctx, dtype)?;
69
70        // Prior to repartitioning, we create a zone map
71        let zoned_writer = ZonedLayoutWriter::new(
72            ctx.clone(),
73            dtype,
74            writer,
75            ArcRef::new_arc(Arc::new(BtrBlocksCompressedStrategy {
76                child: ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())),
77            })),
78            ZonedLayoutOptions {
79                block_size: ROW_BLOCK_SIZE,
80                stats: PRUNING_STATS.into(),
81                max_variable_length_statistics_size: 64,
82            },
83        )
84        .boxed();
85
86        let writer = RepartitionWriter::new(
87            dtype.clone(),
88            zoned_writer,
89            RepartitionWriterOptions {
90                // No minimum block size in bytes
91                block_size_minimum: 0,
92                // Always repartition into 8K row blocks
93                block_len_multiple: ROW_BLOCK_SIZE,
94            },
95        )
96        .boxed();
97
98        Ok(writer)
99    }
100}
101
102struct BtrBlocksCompressedStrategy {
103    child: ArcRef<dyn LayoutStrategy>,
104}
105
106impl LayoutStrategy for BtrBlocksCompressedStrategy {
107    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
108        let child = self.child.new_writer(ctx, dtype)?;
109        Ok(BtrBlocksCompressedWriter {
110            child,
111            previous_chunk: None,
112        }
113        .boxed())
114    }
115}
116
117struct PreviousCompression {
118    chunk: ArrayRef,
119    ratio: f64,
120}
121
122const COMPRESSION_DRIFT_THRESHOLD: f64 = 1.2;
123
124/// A layout writer that compresses chunks using a sampling compressor, and re-uses the previous
125/// compressed chunk as a hint for the next.
126struct BtrBlocksCompressedWriter {
127    child: Box<dyn LayoutWriter>,
128    previous_chunk: Option<PreviousCompression>,
129}
130
131impl LayoutWriter for BtrBlocksCompressedWriter {
132    fn push_chunk(
133        &mut self,
134        segment_writer: &mut dyn SegmentWriter,
135        chunk: ArrayRef,
136    ) -> VortexResult<()> {
137        let chunk = chunk.to_canonical()?.into_array();
138
139        // Compute the stats for the chunk prior to compression
140        chunk
141            .statistics()
142            .compute_all(&Stat::all().collect::<Vec<_>>())?;
143
144        // If we have information about the data from the previous chunk
145        let compressed_chunk = if let Some(constant) = chunk.as_constant() {
146            Some(ConstantArray::new(constant, chunk.len()).into_array())
147        } else if let Some(prev_compression) = self.previous_chunk.as_ref() {
148            let prev_chunk = prev_compression.chunk.clone();
149
150            let canonical_nbytes = chunk.as_ref().nbytes();
151
152            if let Some(encoded_chunk) = encode_children_like(chunk.clone(), prev_chunk)? {
153                let ratio = canonical_nbytes as f64 / encoded_chunk.nbytes() as f64;
154
155                // Make sure the ratio is within the expected drift, if it isn't we  fall back to the compressor.
156                if ratio > (prev_compression.ratio / COMPRESSION_DRIFT_THRESHOLD) {
157                    Some(encoded_chunk)
158                } else {
159                    log::trace!(
160                        "Compressed to a ratio of {ratio}, which is below the threshold of {}",
161                        prev_compression.ratio / COMPRESSION_DRIFT_THRESHOLD
162                    );
163                    None
164                }
165            } else {
166                log::debug!("Couldn't re-encode children");
167
168                None
169            }
170        } else {
171            None
172        };
173
174        let compressed_chunk = match compressed_chunk {
175            Some(array) => array,
176            None => {
177                let canonical_size = chunk.nbytes() as f64;
178                let compressed = BtrBlocksCompressor.compress(&chunk)?;
179
180                if compressed.is_canonical()
181                    || ((canonical_size / compressed.nbytes() as f64) < COMPRESSION_DRIFT_THRESHOLD)
182                {
183                    self.previous_chunk = None;
184                } else {
185                    self.previous_chunk = Some(PreviousCompression {
186                        chunk: compressed.clone(),
187                        ratio: canonical_size / compressed.nbytes() as f64,
188                    });
189                }
190
191                compressed
192            }
193        };
194
195        compressed_chunk.statistics().inherit(chunk.statistics());
196
197        self.child.push_chunk(segment_writer, compressed_chunk)
198    }
199
200    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
201        self.child.flush(segment_writer)
202    }
203
204    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
205        self.child.finish(segment_writer)
206    }
207}
208
209struct BufferedStrategy {
210    child: ArcRef<dyn LayoutStrategy>,
211    buffer_size: u64,
212}
213
214impl LayoutStrategy for BufferedStrategy {
215    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
216        let child = self.child.new_writer(ctx, dtype)?;
217        Ok(BufferedWriter {
218            chunks: Default::default(),
219            nbytes: 0,
220            buffer_size: self.buffer_size,
221            child,
222        }
223        .boxed())
224    }
225}
226
227struct BufferedWriter {
228    chunks: VecDeque<ArrayRef>,
229    nbytes: u64,
230    buffer_size: u64,
231    child: Box<dyn LayoutWriter>,
232}
233
234impl LayoutWriter for BufferedWriter {
235    fn push_chunk(
236        &mut self,
237        segment_writer: &mut dyn SegmentWriter,
238        chunk: ArrayRef,
239    ) -> VortexResult<()> {
240        self.nbytes += chunk.nbytes() as u64;
241        self.chunks.push_back(chunk);
242        // Wait until we're at 2x the buffer size before flushing 1x the buffer size
243        // This avoids small tail stragglers being flushed at the end of the file.
244        if self.nbytes >= 2 * self.buffer_size {
245            while self.nbytes > self.buffer_size {
246                if let Some(chunk) = self.chunks.pop_front() {
247                    self.nbytes -= chunk.nbytes() as u64;
248                    self.child.push_chunk(segment_writer, chunk)?;
249                } else {
250                    break;
251                }
252            }
253        }
254        Ok(())
255    }
256
257    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
258        for chunk in self.chunks.drain(..) {
259            self.child.push_chunk(segment_writer, chunk)?;
260        }
261        self.child.flush(segment_writer)
262    }
263
264    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
265        self.child.finish(segment_writer)
266    }
267}
268
269fn encode_children_like(current: ArrayRef, previous: ArrayRef) -> VortexResult<Option<ArrayRef>> {
270    if let Some(constant) = current.as_constant() {
271        Ok(Some(
272            ConstantArray::new(constant, current.len()).into_array(),
273        ))
274    } else if let Some(encoded) = previous
275        .encoding()
276        .encode(&current.to_canonical()?, Some(&previous))?
277    {
278        let previous_children = previous.children();
279        let encoded_children = encoded.children();
280
281        if previous_children.len() != encoded_children.len() {
282            log::trace!(
283                "Children count mismatch {} and {}",
284                previous_children.len(),
285                encoded_children.len()
286            );
287            return Ok(Some(encoded));
288        }
289
290        let mut new_children: Vec<Arc<dyn Array>> = Vec::with_capacity(encoded_children.len());
291
292        for (p, e) in previous_children
293            .into_iter()
294            .zip_eq(encoded_children.into_iter())
295        {
296            new_children.push(encode_children_like(e.clone(), p)?.unwrap_or(e));
297        }
298
299        Ok(Some(encoded.with_children(&new_children)?))
300    } else {
301        Ok(None)
302    }
303}