vortex_file/
strategy.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! This module defines the default layout strategy for a Vortex file.
5
6use std::sync::Arc;
7
8use vortex_layout::LayoutStrategy;
9use vortex_layout::layouts::buffered::BufferedStrategy;
10use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
11use vortex_layout::layouts::collect::CollectStrategy;
12use vortex_layout::layouts::compressed::CompressingStrategy;
13use vortex_layout::layouts::compressed::CompressorPlugin;
14use vortex_layout::layouts::dict::writer::DictStrategy;
15use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
16use vortex_layout::layouts::repartition::RepartitionStrategy;
17use vortex_layout::layouts::repartition::RepartitionWriterOptions;
18use vortex_layout::layouts::struct_::writer::StructStrategy;
19use vortex_layout::layouts::zoned::writer::ZonedLayoutOptions;
20use vortex_layout::layouts::zoned::writer::ZonedStrategy;
21
22const ONE_MEG: u64 = 1 << 20;
23
24/// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file.
25///
26/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
27/// repartitioning and compressing them to strike a balance between size on-disk,
28/// bulk decoding performance, and IOPS required to perform an indexed read.
29pub struct WriteStrategyBuilder {
30    compressor: Option<Arc<dyn CompressorPlugin>>,
31    row_block_size: usize,
32}
33
34impl Default for WriteStrategyBuilder {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl WriteStrategyBuilder {
41    /// Create a new empty builder. It can be further configured, and then finally built
42    /// yielding the [`LayoutStrategy`].
43    pub const fn new() -> Self {
44        Self {
45            compressor: None,
46            row_block_size: 8192,
47        }
48    }
49
50    /// Override the [compressor][CompressorPlugin] used for compressing chunks in the file.
51    ///
52    /// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance
53    /// total size with decoding performance.
54    pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
55        self.compressor = Some(Arc::new(compressor));
56        self
57    }
58
59    /// Override the row block size used to determine the zone map sizes.
60    pub fn with_row_block_size(mut self, row_block_size: usize) -> Self {
61        self.row_block_size = row_block_size;
62        self
63    }
64
65    /// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
66    /// applied.
67    pub fn build(self) -> Arc<dyn LayoutStrategy> {
68        // 7. for each chunk create a flat layout
69        let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
70        // 6. buffer chunks so they end up with closer segment ids physically
71        let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB
72        // 5. compress each chunk
73        let compressing = if let Some(ref compressor) = self.compressor {
74            CompressingStrategy::new_opaque(buffered, compressor.clone())
75        } else {
76            CompressingStrategy::new_btrblocks(buffered, true)
77        };
78
79        // 4. prior to compression, coalesce up to a minimum size
80        let coalescing = RepartitionStrategy::new(
81            compressing,
82            RepartitionWriterOptions {
83                block_size_minimum: ONE_MEG,
84                block_len_multiple: self.row_block_size,
85                canonicalize: true,
86            },
87        );
88
89        // 2.1. | 3.1. compress stats tables and dict values.
90        let compress_then_flat = if let Some(ref compressor) = self.compressor {
91            CompressingStrategy::new_opaque(FlatLayoutStrategy::default(), compressor.clone())
92        } else {
93            CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), false)
94        };
95
96        // 3. apply dict encoding or fallback
97        let dict = DictStrategy::new(
98            coalescing.clone(),
99            compress_then_flat.clone(),
100            coalescing,
101            Default::default(),
102        );
103
104        // 2. calculate stats for each row group
105        let stats = ZonedStrategy::new(
106            dict,
107            compress_then_flat.clone(),
108            ZonedLayoutOptions {
109                block_size: self.row_block_size,
110                ..Default::default()
111            },
112        );
113
114        // 1. repartition each column to fixed row counts
115        let repartition = RepartitionStrategy::new(
116            stats,
117            RepartitionWriterOptions {
118                // No minimum block size in bytes
119                block_size_minimum: 0,
120                // Always repartition into 8K row blocks
121                block_len_multiple: self.row_block_size,
122                canonicalize: false,
123            },
124        );
125
126        // 0. start with splitting columns
127        let validity_strategy = CollectStrategy::new(compress_then_flat);
128
129        Arc::new(StructStrategy::new(repartition, validity_strategy))
130    }
131}