vortex_file/
strategy.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! This module defines the default layout strategy for a Vortex file.
5
6use std::sync::Arc;
7
8use vortex_layout::LayoutStrategy;
9use vortex_layout::layouts::buffered::BufferedStrategy;
10use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
11use vortex_layout::layouts::collect::CollectStrategy;
12use vortex_layout::layouts::compressed::{CompressingStrategy, CompressorPlugin};
13use vortex_layout::layouts::dict::writer::DictStrategy;
14use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
15use vortex_layout::layouts::repartition::{RepartitionStrategy, RepartitionWriterOptions};
16use vortex_layout::layouts::struct_::writer::StructStrategy;
17use vortex_layout::layouts::zoned::writer::{ZonedLayoutOptions, ZonedStrategy};
18
19const ONE_MEG: u64 = 1 << 20;
20
21/// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file.
22///
23/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
24/// repartitioning and compressing them to strike a balance between size on-disk,
25/// bulk decoding performance, and IOPS required to perform an indexed read.
26pub struct WriteStrategyBuilder {
27    compressor: Option<Arc<dyn CompressorPlugin>>,
28    row_block_size: usize,
29}
30
31impl Default for WriteStrategyBuilder {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl WriteStrategyBuilder {
38    /// Create a new empty builder. It can be further configured, and then finally built
39    /// yielding the [`LayoutStrategy`].
40    pub const fn new() -> Self {
41        Self {
42            compressor: None,
43            row_block_size: 8192,
44        }
45    }
46
47    /// Override the [compressor][CompressorPlugin] used for compressing chunks in the file.
48    ///
49    /// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance
50    /// total size with decoding performance.
51    pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
52        self.compressor = Some(Arc::new(compressor));
53        self
54    }
55
56    /// Override the row block size used to determine the zone map sizes.
57    pub fn with_row_block_size(mut self, row_block_size: usize) -> Self {
58        self.row_block_size = row_block_size;
59        self
60    }
61
62    /// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
63    /// applied.
64    pub fn build(self) -> Arc<dyn LayoutStrategy> {
65        // 7. for each chunk create a flat layout
66        let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
67        // 6. buffer chunks so they end up with closer segment ids physically
68        let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB
69        // 5. compress each chunk
70        let compressing = if let Some(ref compressor) = self.compressor {
71            CompressingStrategy::new_opaque(buffered, compressor.clone())
72        } else {
73            CompressingStrategy::new_btrblocks(buffered, true)
74        };
75
76        // 4. prior to compression, coalesce up to a minimum size
77        let coalescing = RepartitionStrategy::new(
78            compressing,
79            RepartitionWriterOptions {
80                block_size_minimum: ONE_MEG,
81                block_len_multiple: self.row_block_size,
82                canonicalize: true,
83            },
84        );
85
86        // 2.1. | 3.1. compress stats tables and dict values.
87        let compress_then_flat = if let Some(ref compressor) = self.compressor {
88            CompressingStrategy::new_opaque(FlatLayoutStrategy::default(), compressor.clone())
89        } else {
90            CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), false)
91        };
92
93        // 3. apply dict encoding or fallback
94        let dict = DictStrategy::new(
95            coalescing.clone(),
96            compress_then_flat.clone(),
97            coalescing,
98            Default::default(),
99        );
100
101        // 2. calculate stats for each row group
102        let stats = ZonedStrategy::new(
103            dict,
104            compress_then_flat.clone(),
105            ZonedLayoutOptions {
106                block_size: self.row_block_size,
107                ..Default::default()
108            },
109        );
110
111        // 1. repartition each column to fixed row counts
112        let repartition = RepartitionStrategy::new(
113            stats,
114            RepartitionWriterOptions {
115                // No minimum block size in bytes
116                block_size_minimum: 0,
117                // Always repartition into 8K row blocks
118                block_len_multiple: self.row_block_size,
119                canonicalize: false,
120            },
121        );
122
123        // 0. start with splitting columns
124        let validity_strategy = CollectStrategy::new(compress_then_flat);
125
126        Arc::new(StructStrategy::new(repartition, validity_strategy))
127    }
128}