1use std::sync::Arc;
7
8use vortex_layout::LayoutStrategy;
9use vortex_layout::layouts::buffered::BufferedStrategy;
10use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
11use vortex_layout::layouts::compressed::{CompressingStrategy, CompressorPlugin};
12use vortex_layout::layouts::dict::writer::DictStrategy;
13use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
14use vortex_layout::layouts::repartition::{RepartitionStrategy, RepartitionWriterOptions};
15use vortex_layout::layouts::struct_::writer::StructStrategy;
16use vortex_layout::layouts::zoned::writer::{ZonedLayoutOptions, ZonedStrategy};
17
18const ONE_MEG: u64 = 1 << 20;
19
20pub struct WriteStrategyBuilder {
26 compressor: Option<Arc<dyn CompressorPlugin>>,
27 row_block_size: usize,
28}
29
30impl Default for WriteStrategyBuilder {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl WriteStrategyBuilder {
37 pub const fn new() -> Self {
40 Self {
41 compressor: None,
42 row_block_size: 8192,
43 }
44 }
45
46 pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
51 self.compressor = Some(Arc::new(compressor));
52 self
53 }
54
55 pub fn with_row_block_size(mut self, row_block_size: usize) -> Self {
57 self.row_block_size = row_block_size;
58 self
59 }
60
61 pub fn build(self) -> Arc<dyn LayoutStrategy> {
64 let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
66 let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); let compressing = if let Some(ref compressor) = self.compressor {
70 CompressingStrategy::new_opaque(buffered, compressor.clone())
71 } else {
72 CompressingStrategy::new_btrblocks(buffered, true)
73 };
74
75 let coalescing = RepartitionStrategy::new(
77 compressing,
78 RepartitionWriterOptions {
79 block_size_minimum: ONE_MEG,
80 block_len_multiple: self.row_block_size,
81 canonicalize: true,
82 },
83 );
84
85 let compress_then_flat = if let Some(ref compressor) = self.compressor {
87 CompressingStrategy::new_opaque(FlatLayoutStrategy::default(), compressor.clone())
88 } else {
89 CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), false)
90 };
91
92 let dict = DictStrategy::new(
94 coalescing.clone(),
95 compress_then_flat.clone(),
96 coalescing,
97 Default::default(),
98 );
99
100 let stats = ZonedStrategy::new(
102 dict,
103 compress_then_flat,
104 ZonedLayoutOptions {
105 block_size: self.row_block_size,
106 ..Default::default()
107 },
108 );
109
110 let repartition = RepartitionStrategy::new(
112 stats,
113 RepartitionWriterOptions {
114 block_size_minimum: 0,
116 block_len_multiple: self.row_block_size,
118 canonicalize: false,
119 },
120 );
121
122 Arc::new(StructStrategy::new(repartition))
124 }
125}