1use std::sync::Arc;
7
8use vortex_array::stats::PRUNING_STATS;
9use vortex_layout::layouts::buffered::BufferedStrategy;
10use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
11use vortex_layout::layouts::compressed::{CompressingStrategy, CompressorPlugin};
12use vortex_layout::layouts::dict::writer::DictStrategy;
13use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
14use vortex_layout::layouts::repartition::{RepartitionStrategy, RepartitionWriterOptions};
15use vortex_layout::layouts::struct_::writer::StructStrategy;
16use vortex_layout::layouts::zoned::writer::{ZonedLayoutOptions, ZonedStrategy};
17use vortex_layout::{LayoutStrategy, LocalExecutor, TaskExecutor};
18
19const ONE_MEG: u64 = 1 << 20;
20const ROW_BLOCK_SIZE: usize = 8192;
21
22pub struct WriteStrategyBuilder {
28 executor: Option<Arc<dyn TaskExecutor>>,
29 compressor: Option<Arc<dyn CompressorPlugin>>,
30}
31
32impl Default for WriteStrategyBuilder {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl WriteStrategyBuilder {
39 pub const fn new() -> Self {
42 Self {
43 executor: None,
44 compressor: None,
45 }
46 }
47
48 pub fn with_executor(mut self, executor: Arc<dyn TaskExecutor>) -> Self {
53 self.executor = Some(executor.clone());
54 self
55 }
56
57 pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
62 self.compressor = Some(Arc::new(compressor));
63 self
64 }
65
66 pub fn build(self) -> Arc<dyn LayoutStrategy> {
69 let executor = self.executor.unwrap_or_else(|| Arc::new(LocalExecutor));
70
71 let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
73 let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); let compressing = if let Some(ref compressor) = self.compressor {
77 CompressingStrategy::new_opaque(buffered, compressor.clone(), executor.clone(), 16)
78 } else {
79 CompressingStrategy::new_btrblocks(buffered, executor.clone(), 16, true)
80 };
81
82 let coalescing = RepartitionStrategy::new(
84 compressing,
85 RepartitionWriterOptions {
86 block_size_minimum: ONE_MEG,
87 block_len_multiple: ROW_BLOCK_SIZE,
88 },
89 );
90
91 let compress_then_flat = if let Some(ref compressor) = self.compressor {
93 CompressingStrategy::new_opaque(
94 FlatLayoutStrategy::default(),
95 compressor.clone(),
96 executor.clone(),
97 1,
98 )
99 } else {
100 CompressingStrategy::new_btrblocks(
101 FlatLayoutStrategy::default(),
102 executor.clone(),
103 1,
104 false,
105 )
106 };
107
108 let dict = DictStrategy::new(
110 coalescing.clone(),
111 compress_then_flat.clone(),
112 coalescing,
113 Default::default(),
114 executor.clone(),
115 );
116
117 let stats = ZonedStrategy::new(
119 dict,
120 compress_then_flat,
121 ZonedLayoutOptions {
122 block_size: ROW_BLOCK_SIZE,
123 stats: PRUNING_STATS.into(),
124 max_variable_length_statistics_size: 64,
125 parallelism: 16,
126 },
127 executor.clone(),
128 );
129
130 let repartition = RepartitionStrategy::new(
132 stats,
133 RepartitionWriterOptions {
134 block_size_minimum: 0,
136 block_len_multiple: ROW_BLOCK_SIZE,
138 },
139 );
140
141 Arc::new(StructStrategy::new(repartition))
143 }
144}