vortex_file/
strategy.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! This module defines the default layout strategy for a Vortex file.
5
6use std::sync::Arc;
7
8use vortex_array::stats::PRUNING_STATS;
9use vortex_layout::layouts::buffered::BufferedStrategy;
10use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
11use vortex_layout::layouts::compressed::{CompressingStrategy, CompressorPlugin};
12use vortex_layout::layouts::dict::writer::DictStrategy;
13use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
14use vortex_layout::layouts::repartition::{RepartitionStrategy, RepartitionWriterOptions};
15use vortex_layout::layouts::struct_::writer::StructStrategy;
16use vortex_layout::layouts::zoned::writer::{ZonedLayoutOptions, ZonedStrategy};
17use vortex_layout::{LayoutStrategy, LocalExecutor, TaskExecutor};
18
19const ONE_MEG: u64 = 1 << 20;
20const ROW_BLOCK_SIZE: usize = 8192;
21
22/// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file.
23///
24/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
25/// repartitioning and compressing them to strike a balance between size on-disk,
26/// bulk decoding performance, and IOPS required to perform an indexed read.
27pub struct WriteStrategyBuilder {
28    executor: Option<Arc<dyn TaskExecutor>>,
29    compressor: Option<Arc<dyn CompressorPlugin>>,
30}
31
32impl Default for WriteStrategyBuilder {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl WriteStrategyBuilder {
39    /// Create a new empty builder. It can be further configured, and then finally built
40    /// yielding the [`LayoutStrategy`].
41    pub const fn new() -> Self {
42        Self {
43            executor: None,
44            compressor: None,
45        }
46    }
47
48    /// Override the executor for spawning blocking expensive work.
49    ///
50    /// If not provided, this defaults to an executor that blocks the current thread the
51    /// `write_stream` task runs on.
52    pub fn with_executor(mut self, executor: Arc<dyn TaskExecutor>) -> Self {
53        self.executor = Some(executor.clone());
54        self
55    }
56
57    /// Override the [compressor][CompressorPlugin] used for compressing chunks in the file.
58    ///
59    /// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance
60    /// total size with decoding performance.
61    pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
62        self.compressor = Some(Arc::new(compressor));
63        self
64    }
65
66    /// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
67    /// applied.
68    pub fn build(self) -> Arc<dyn LayoutStrategy> {
69        let executor = self.executor.unwrap_or_else(|| Arc::new(LocalExecutor));
70
71        // 7. for each chunk create a flat layout
72        let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
73        // 6. buffer chunks so they end up with closer segment ids physically
74        let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB
75        // 5. compress each chunk
76        let compressing = if let Some(ref compressor) = self.compressor {
77            CompressingStrategy::new_opaque(buffered, compressor.clone(), executor.clone(), 16)
78        } else {
79            CompressingStrategy::new_btrblocks(buffered, executor.clone(), 16)
80        };
81
82        // 4. prior to compression, coalesce up to a minimum size
83        let coalescing = RepartitionStrategy::new(
84            compressing,
85            RepartitionWriterOptions {
86                block_size_minimum: ONE_MEG,
87                block_len_multiple: ROW_BLOCK_SIZE,
88            },
89        );
90
91        // 2.1. | 3.1. compress stats tables and dict values.
92        let compress_then_flat = if let Some(ref compressor) = self.compressor {
93            CompressingStrategy::new_opaque(
94                FlatLayoutStrategy::default(),
95                compressor.clone(),
96                executor.clone(),
97                1,
98            )
99        } else {
100            CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), executor.clone(), 1)
101        };
102
103        // 3. apply dict encoding or fallback
104        let dict = DictStrategy::new(
105            coalescing.clone(),
106            compress_then_flat.clone(),
107            coalescing,
108            Default::default(),
109            executor.clone(),
110        );
111
112        // 2. calculate stats for each row group
113        let stats = ZonedStrategy::new(
114            dict,
115            compress_then_flat,
116            ZonedLayoutOptions {
117                block_size: ROW_BLOCK_SIZE,
118                stats: PRUNING_STATS.into(),
119                max_variable_length_statistics_size: 64,
120                parallelism: 16,
121            },
122            executor.clone(),
123        );
124
125        // 1. repartition each column to fixed row counts
126        let repartition = RepartitionStrategy::new(
127            stats,
128            RepartitionWriterOptions {
129                // No minimum block size in bytes
130                block_size_minimum: 0,
131                // Always repartition into 8K row blocks
132                block_len_multiple: ROW_BLOCK_SIZE,
133            },
134        );
135
136        // 0. start with splitting columns
137        Arc::new(StructStrategy::new(repartition))
138    }
139}