1use std::sync::Arc;
7
8use vortex_layout::LayoutStrategy;
9use vortex_layout::layouts::buffered::BufferedStrategy;
10use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
11use vortex_layout::layouts::collect::CollectStrategy;
12use vortex_layout::layouts::compressed::CompressingStrategy;
13use vortex_layout::layouts::compressed::CompressorPlugin;
14use vortex_layout::layouts::dict::writer::DictStrategy;
15use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
16use vortex_layout::layouts::repartition::RepartitionStrategy;
17use vortex_layout::layouts::repartition::RepartitionWriterOptions;
18use vortex_layout::layouts::struct_::writer::StructStrategy;
19use vortex_layout::layouts::zoned::writer::ZonedLayoutOptions;
20use vortex_layout::layouts::zoned::writer::ZonedStrategy;
21
22const ONE_MEG: u64 = 1 << 20;
23
24pub struct WriteStrategyBuilder {
30 compressor: Option<Arc<dyn CompressorPlugin>>,
31 row_block_size: usize,
32}
33
34impl Default for WriteStrategyBuilder {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl WriteStrategyBuilder {
41 pub const fn new() -> Self {
44 Self {
45 compressor: None,
46 row_block_size: 8192,
47 }
48 }
49
50 pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
55 self.compressor = Some(Arc::new(compressor));
56 self
57 }
58
59 pub fn with_row_block_size(mut self, row_block_size: usize) -> Self {
61 self.row_block_size = row_block_size;
62 self
63 }
64
65 pub fn build(self) -> Arc<dyn LayoutStrategy> {
68 let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
70 let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); let compressing = if let Some(ref compressor) = self.compressor {
74 CompressingStrategy::new_opaque(buffered, compressor.clone())
75 } else {
76 CompressingStrategy::new_btrblocks(buffered, true)
77 };
78
79 let coalescing = RepartitionStrategy::new(
81 compressing,
82 RepartitionWriterOptions {
83 block_size_minimum: ONE_MEG,
84 block_len_multiple: self.row_block_size,
85 canonicalize: true,
86 },
87 );
88
89 let compress_then_flat = if let Some(ref compressor) = self.compressor {
91 CompressingStrategy::new_opaque(FlatLayoutStrategy::default(), compressor.clone())
92 } else {
93 CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), false)
94 };
95
96 let dict = DictStrategy::new(
98 coalescing.clone(),
99 compress_then_flat.clone(),
100 coalescing,
101 Default::default(),
102 );
103
104 let stats = ZonedStrategy::new(
106 dict,
107 compress_then_flat.clone(),
108 ZonedLayoutOptions {
109 block_size: self.row_block_size,
110 ..Default::default()
111 },
112 );
113
114 let repartition = RepartitionStrategy::new(
116 stats,
117 RepartitionWriterOptions {
118 block_size_minimum: 0,
120 block_len_multiple: self.row_block_size,
122 canonicalize: false,
123 },
124 );
125
126 let validity_strategy = CollectStrategy::new(compress_then_flat);
128
129 Arc::new(StructStrategy::new(repartition, validity_strategy))
130 }
131}