Skip to main content

vortex_file/
strategy.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! This module defines the default layout strategy for a Vortex file.
5
6use std::sync::Arc;
7use std::sync::LazyLock;
8
9// Compressed encodings from encoding crates
10// Canonical array encodings from vortex-array
11use vortex_alp::ALPRDVTable;
12use vortex_alp::ALPVTable;
13use vortex_array::arrays::BoolVTable;
14use vortex_array::arrays::ChunkedVTable;
15use vortex_array::arrays::ConstantVTable;
16use vortex_array::arrays::DecimalVTable;
17use vortex_array::arrays::DictVTable;
18use vortex_array::arrays::ExtensionVTable;
19use vortex_array::arrays::FixedSizeListVTable;
20use vortex_array::arrays::ListVTable;
21use vortex_array::arrays::ListViewVTable;
22use vortex_array::arrays::MaskedVTable;
23use vortex_array::arrays::NullVTable;
24use vortex_array::arrays::PrimitiveVTable;
25use vortex_array::arrays::StructVTable;
26use vortex_array::arrays::VarBinVTable;
27use vortex_array::arrays::VarBinViewVTable;
28use vortex_array::session::ArrayRegistry;
29#[cfg(feature = "zstd")]
30use vortex_btrblocks::BtrBlocksCompressorBuilder;
31#[cfg(feature = "zstd")]
32use vortex_btrblocks::FloatCode;
33#[cfg(feature = "zstd")]
34use vortex_btrblocks::IntCode;
35#[cfg(feature = "zstd")]
36use vortex_btrblocks::StringCode;
37use vortex_bytebool::ByteBoolVTable;
38use vortex_datetime_parts::DateTimePartsVTable;
39use vortex_decimal_byte_parts::DecimalBytePartsVTable;
40use vortex_dtype::FieldPath;
41use vortex_fastlanes::BitPackedVTable;
42use vortex_fastlanes::DeltaVTable;
43use vortex_fastlanes::FoRVTable;
44use vortex_fastlanes::RLEVTable;
45use vortex_fsst::FSSTVTable;
46use vortex_layout::LayoutStrategy;
47use vortex_layout::layouts::buffered::BufferedStrategy;
48use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
49use vortex_layout::layouts::collect::CollectStrategy;
50use vortex_layout::layouts::compressed::CompressingStrategy;
51use vortex_layout::layouts::compressed::CompressorPlugin;
52use vortex_layout::layouts::dict::writer::DictStrategy;
53use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
54use vortex_layout::layouts::repartition::RepartitionStrategy;
55use vortex_layout::layouts::repartition::RepartitionWriterOptions;
56use vortex_layout::layouts::table::TableStrategy;
57use vortex_layout::layouts::zoned::writer::ZonedLayoutOptions;
58use vortex_layout::layouts::zoned::writer::ZonedStrategy;
59use vortex_pco::PcoVTable;
60use vortex_runend::RunEndVTable;
61use vortex_sequence::SequenceVTable;
62use vortex_sparse::SparseVTable;
63use vortex_utils::aliases::hash_map::HashMap;
64use vortex_zigzag::ZigZagVTable;
65#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
66use vortex_zstd::ZstdBuffersVTable;
67#[cfg(feature = "zstd")]
68use vortex_zstd::ZstdVTable;
69
70const ONE_MEG: u64 = 1 << 20;
71
72/// Static registry of all allowed array encodings for file writing.
73///
74/// This includes all canonical encodings from vortex-array plus all compressed
75/// encodings from the various encoding crates.
76pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
77    let registry = ArrayRegistry::default();
78
79    // Canonical encodings from vortex-array
80    registry.register(NullVTable::ID, NullVTable);
81    registry.register(BoolVTable::ID, BoolVTable);
82    registry.register(PrimitiveVTable::ID, PrimitiveVTable);
83    registry.register(DecimalVTable::ID, DecimalVTable);
84    registry.register(VarBinVTable::ID, VarBinVTable);
85    registry.register(VarBinViewVTable::ID, VarBinViewVTable);
86    registry.register(ListVTable::ID, ListVTable);
87    registry.register(ListViewVTable::ID, ListViewVTable);
88    registry.register(FixedSizeListVTable::ID, FixedSizeListVTable);
89    registry.register(StructVTable::ID, StructVTable);
90    registry.register(ExtensionVTable::ID, ExtensionVTable);
91    registry.register(ChunkedVTable::ID, ChunkedVTable);
92    registry.register(ConstantVTable::ID, ConstantVTable);
93    registry.register(MaskedVTable::ID, MaskedVTable);
94    registry.register(DictVTable::ID, DictVTable);
95
96    // Compressed encodings from encoding crates
97    registry.register(ALPVTable::ID, ALPVTable);
98    registry.register(ALPRDVTable::ID, ALPRDVTable);
99    registry.register(BitPackedVTable::ID, BitPackedVTable);
100    registry.register(ByteBoolVTable::ID, ByteBoolVTable);
101    registry.register(DateTimePartsVTable::ID, DateTimePartsVTable);
102    registry.register(DecimalBytePartsVTable::ID, DecimalBytePartsVTable);
103    registry.register(DeltaVTable::ID, DeltaVTable);
104    registry.register(FoRVTable::ID, FoRVTable);
105    registry.register(FSSTVTable::ID, FSSTVTable);
106    registry.register(PcoVTable::ID, PcoVTable);
107    registry.register(RLEVTable::ID, RLEVTable);
108    registry.register(RunEndVTable::ID, RunEndVTable);
109    registry.register(SequenceVTable::ID, SequenceVTable);
110    registry.register(SparseVTable::ID, SparseVTable);
111    registry.register(ZigZagVTable::ID, ZigZagVTable);
112
113    #[cfg(feature = "zstd")]
114    registry.register(ZstdVTable::ID, ZstdVTable);
115    #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
116    registry.register(ZstdBuffersVTable::ID, ZstdBuffersVTable);
117
118    registry
119});
120
121/// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file.
122///
123/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
124/// repartitioning and compressing them to strike a balance between size on-disk,
125/// bulk decoding performance, and IOPS required to perform an indexed read.
126pub struct WriteStrategyBuilder {
127    compressor: Option<Arc<dyn CompressorPlugin>>,
128    row_block_size: usize,
129    field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
130    allow_encodings: Option<ArrayRegistry>,
131    flat_strategy: Option<Arc<dyn LayoutStrategy>>,
132}
133
134impl Default for WriteStrategyBuilder {
135    /// Create a new empty builder. It can be further configured,
136    /// and then finally built yielding the [`LayoutStrategy`].
137    fn default() -> Self {
138        Self {
139            compressor: None,
140            row_block_size: 8192,
141            field_writers: HashMap::new(),
142            allow_encodings: None,
143            flat_strategy: None,
144        }
145    }
146}
147
148impl WriteStrategyBuilder {
149    /// Override the [compressor][CompressorPlugin] used for compressing chunks in the file.
150    ///
151    /// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance
152    /// total size with decoding performance.
153    pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
154        self.compressor = Some(Arc::new(compressor));
155        self
156    }
157
158    /// Override the row block size used to determine the zone map sizes.
159    pub fn with_row_block_size(mut self, row_block_size: usize) -> Self {
160        self.row_block_size = row_block_size;
161        self
162    }
163
164    /// Override the default write layout for a specific field somewhere in the nested
165    /// schema tree.
166    pub fn with_field_writer(
167        mut self,
168        field: impl Into<FieldPath>,
169        writer: Arc<dyn LayoutStrategy>,
170    ) -> Self {
171        self.field_writers.insert(field.into(), writer);
172        self
173    }
174
175    /// Override the allowed array encodings for normalization.
176    pub fn with_allow_encodings(mut self, allow_encodings: ArrayRegistry) -> Self {
177        self.allow_encodings = Some(allow_encodings);
178        self
179    }
180
181    /// Override the flat layout strategy used for leaf chunks.
182    ///
183    /// By default, this uses [`FlatLayoutStrategy`]. This can be used to substitute a custom
184    /// layout strategy, e.g. one that inlines constant array buffers for GPU reads.
185    pub fn with_flat_strategy(mut self, flat: Arc<dyn LayoutStrategy>) -> Self {
186        self.flat_strategy = Some(flat);
187        self
188    }
189
190    /// Configure a write strategy that emits only CUDA-compatible encodings.
191    ///
192    /// This configures BtrBlocks to exclude schemes without CUDA kernel support.
193    /// With the `unstable_encodings` feature, strings use buffer-level Zstd compression
194    /// (`ZstdBuffersArray`) which preserves the array buffer layout for zero-conversion
195    /// GPU decompression. Without it, strings use interleaved Zstd compression.
196    #[cfg(feature = "zstd")]
197    pub fn with_cuda_compatible_encodings(mut self) -> Self {
198        let mut builder = BtrBlocksCompressorBuilder::default()
199            .exclude_int([IntCode::Sparse, IntCode::Rle])
200            .exclude_float([FloatCode::AlpRd, FloatCode::Rle, FloatCode::Sparse])
201            .exclude_string([StringCode::Dict, StringCode::Fsst]);
202
203        #[cfg(feature = "unstable_encodings")]
204        {
205            builder = builder.include_string([StringCode::ZstdBuffers]);
206        }
207        #[cfg(not(feature = "unstable_encodings"))]
208        {
209            builder = builder.include_string([StringCode::Zstd]);
210        }
211
212        self.compressor = Some(Arc::new(builder.build()));
213        self
214    }
215
216    /// Configure a write strategy that uses compact encodings (Pco for numerics, Zstd for
217    /// strings/binary).
218    ///
219    /// This provides better compression ratios than the default BtrBlocks strategy,
220    /// especially for floating-point heavy datasets.
221    #[cfg(feature = "zstd")]
222    pub fn with_compact_encodings(mut self) -> Self {
223        let btrblocks = BtrBlocksCompressorBuilder::default()
224            .include_string([StringCode::Zstd])
225            .include_int([IntCode::Pco])
226            .include_float([FloatCode::Pco])
227            .build();
228
229        self.compressor = Some(Arc::new(btrblocks));
230        self
231    }
232
233    /// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
234    /// applied.
235    pub fn build(self) -> Arc<dyn LayoutStrategy> {
236        let flat: Arc<dyn LayoutStrategy> = if let Some(flat) = self.flat_strategy {
237            flat
238        } else if let Some(allow_encodings) = self.allow_encodings {
239            Arc::new(FlatLayoutStrategy::default().with_allow_encodings(allow_encodings))
240        } else {
241            Arc::new(FlatLayoutStrategy::default())
242        };
243
244        // 7. for each chunk create a flat layout
245        let chunked = ChunkedLayoutStrategy::new(flat.clone());
246        // 6. buffer chunks so they end up with closer segment ids physically
247        let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB
248        // 5. compress each chunk
249        let compressing = if let Some(ref compressor) = self.compressor {
250            CompressingStrategy::new_opaque(buffered, compressor.clone())
251        } else {
252            CompressingStrategy::new_btrblocks(buffered, true)
253        };
254
255        // 4. prior to compression, coalesce up to a minimum size
256        let coalescing = RepartitionStrategy::new(
257            compressing,
258            RepartitionWriterOptions {
259                // Write stream partitions roughly become segments. Because Vortex never reads less
260                // than one segment, the size of segments and, therefore, partitions, must be small
261                // enough to both (1) allow fine-grained random access reads and (2) allow
262                // sufficient read concurrency for the desired throughput. One megabyte is small
263                // enough to achieve this for S3 (Durner et al., "Exploiting Cloud Object Storage for
264                // High-Performance Analytics", VLDB Vol 16, Iss 11).
265                block_size_minimum: ONE_MEG,
266                block_len_multiple: self.row_block_size,
267                block_size_target: Some(ONE_MEG),
268                canonicalize: true,
269            },
270        );
271
272        // 2.1. | 3.1. compress stats tables and dict values.
273        let compress_then_flat = if let Some(ref compressor) = self.compressor {
274            CompressingStrategy::new_opaque(flat, compressor.clone())
275        } else {
276            CompressingStrategy::new_btrblocks(flat, false)
277        };
278
279        // 3. apply dict encoding or fallback
280        let dict = DictStrategy::new(
281            coalescing.clone(),
282            compress_then_flat.clone(),
283            coalescing,
284            Default::default(),
285        );
286
287        // 2. calculate stats for each row group
288        let stats = ZonedStrategy::new(
289            dict,
290            compress_then_flat.clone(),
291            ZonedLayoutOptions {
292                block_size: self.row_block_size,
293                ..Default::default()
294            },
295        );
296
297        // 1. repartition each column to fixed row counts
298        let repartition = RepartitionStrategy::new(
299            stats,
300            RepartitionWriterOptions {
301                // No minimum block size in bytes
302                block_size_minimum: 0,
303                // Always repartition into 8K row blocks
304                block_len_multiple: self.row_block_size,
305                block_size_target: None,
306                canonicalize: false,
307            },
308        );
309
310        // 0. start with splitting columns
311        let validity_strategy = CollectStrategy::new(compress_then_flat);
312
313        // Take any field overrides from the builder and apply them to the final strategy.
314        let table_strategy = TableStrategy::new(Arc::new(validity_strategy), Arc::new(repartition))
315            .with_field_writers(self.field_writers);
316
317        Arc::new(table_strategy)
318    }
319}