1use std::sync::Arc;
7
8use vortex_array::stats::PRUNING_STATS;
9use vortex_layout::layouts::buffered::BufferedStrategy;
10use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
11use vortex_layout::layouts::compressed::{CompressingStrategy, CompressorPlugin};
12use vortex_layout::layouts::dict::writer::DictStrategy;
13use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
14use vortex_layout::layouts::repartition::{RepartitionStrategy, RepartitionWriterOptions};
15use vortex_layout::layouts::struct_::writer::StructStrategy;
16use vortex_layout::layouts::zoned::writer::{ZonedLayoutOptions, ZonedStrategy};
17use vortex_layout::{LayoutStrategy, LocalExecutor, TaskExecutor};
18
19const ONE_MEG: u64 = 1 << 20;
20const ROW_BLOCK_SIZE: usize = 8192;
21
22pub struct WriteStrategyBuilder {
28 executor: Option<Arc<dyn TaskExecutor>>,
29 compressor: Option<Arc<dyn CompressorPlugin>>,
30}
31
32impl Default for WriteStrategyBuilder {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl WriteStrategyBuilder {
39 pub const fn new() -> Self {
42 Self {
43 executor: None,
44 compressor: None,
45 }
46 }
47
48 pub fn with_executor(mut self, executor: Arc<dyn TaskExecutor>) -> Self {
53 self.executor = Some(executor.clone());
54 self
55 }
56
57 pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
62 self.compressor = Some(Arc::new(compressor));
63 self
64 }
65
66 pub fn build(self) -> Arc<dyn LayoutStrategy> {
69 let executor = self.executor.unwrap_or_else(|| Arc::new(LocalExecutor));
70
71 let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
73 let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); let compressing = if let Some(ref compressor) = self.compressor {
77 CompressingStrategy::new_opaque(buffered, compressor.clone(), executor.clone(), 16)
78 } else {
79 CompressingStrategy::new_btrblocks(buffered, executor.clone(), 16)
80 };
81
82 let coalescing = RepartitionStrategy::new(
84 compressing,
85 RepartitionWriterOptions {
86 block_size_minimum: ONE_MEG,
87 block_len_multiple: ROW_BLOCK_SIZE,
88 },
89 );
90
91 let compress_then_flat = if let Some(ref compressor) = self.compressor {
93 CompressingStrategy::new_opaque(
94 FlatLayoutStrategy::default(),
95 compressor.clone(),
96 executor.clone(),
97 1,
98 )
99 } else {
100 CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), executor.clone(), 1)
101 };
102
103 let dict = DictStrategy::new(
105 coalescing.clone(),
106 compress_then_flat.clone(),
107 coalescing,
108 Default::default(),
109 executor.clone(),
110 );
111
112 let stats = ZonedStrategy::new(
114 dict,
115 compress_then_flat,
116 ZonedLayoutOptions {
117 block_size: ROW_BLOCK_SIZE,
118 stats: PRUNING_STATS.into(),
119 max_variable_length_statistics_size: 64,
120 parallelism: 16,
121 },
122 executor.clone(),
123 );
124
125 let repartition = RepartitionStrategy::new(
127 stats,
128 RepartitionWriterOptions {
129 block_size_minimum: 0,
131 block_len_multiple: ROW_BLOCK_SIZE,
133 },
134 );
135
136 Arc::new(StructStrategy::new(repartition))
138 }
139}