1use std::sync::Arc;
7
8use vortex_layout::LayoutStrategy;
9use vortex_layout::layouts::buffered::BufferedStrategy;
10use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
11use vortex_layout::layouts::collect::CollectStrategy;
12use vortex_layout::layouts::compressed::{CompressingStrategy, CompressorPlugin};
13use vortex_layout::layouts::dict::writer::DictStrategy;
14use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
15use vortex_layout::layouts::repartition::{RepartitionStrategy, RepartitionWriterOptions};
16use vortex_layout::layouts::struct_::writer::StructStrategy;
17use vortex_layout::layouts::zoned::writer::{ZonedLayoutOptions, ZonedStrategy};
18
19const ONE_MEG: u64 = 1 << 20;
20
21pub struct WriteStrategyBuilder {
27 compressor: Option<Arc<dyn CompressorPlugin>>,
28 row_block_size: usize,
29}
30
31impl Default for WriteStrategyBuilder {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl WriteStrategyBuilder {
38 pub const fn new() -> Self {
41 Self {
42 compressor: None,
43 row_block_size: 8192,
44 }
45 }
46
47 pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
52 self.compressor = Some(Arc::new(compressor));
53 self
54 }
55
56 pub fn with_row_block_size(mut self, row_block_size: usize) -> Self {
58 self.row_block_size = row_block_size;
59 self
60 }
61
62 pub fn build(self) -> Arc<dyn LayoutStrategy> {
65 let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
67 let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); let compressing = if let Some(ref compressor) = self.compressor {
71 CompressingStrategy::new_opaque(buffered, compressor.clone())
72 } else {
73 CompressingStrategy::new_btrblocks(buffered, true)
74 };
75
76 let coalescing = RepartitionStrategy::new(
78 compressing,
79 RepartitionWriterOptions {
80 block_size_minimum: ONE_MEG,
81 block_len_multiple: self.row_block_size,
82 canonicalize: true,
83 },
84 );
85
86 let compress_then_flat = if let Some(ref compressor) = self.compressor {
88 CompressingStrategy::new_opaque(FlatLayoutStrategy::default(), compressor.clone())
89 } else {
90 CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), false)
91 };
92
93 let dict = DictStrategy::new(
95 coalescing.clone(),
96 compress_then_flat.clone(),
97 coalescing,
98 Default::default(),
99 );
100
101 let stats = ZonedStrategy::new(
103 dict,
104 compress_then_flat.clone(),
105 ZonedLayoutOptions {
106 block_size: self.row_block_size,
107 ..Default::default()
108 },
109 );
110
111 let repartition = RepartitionStrategy::new(
113 stats,
114 RepartitionWriterOptions {
115 block_size_minimum: 0,
117 block_len_multiple: self.row_block_size,
119 canonicalize: false,
120 },
121 );
122
123 let validity_strategy = CollectStrategy::new(compress_then_flat);
125
126 Arc::new(StructStrategy::new(repartition, validity_strategy))
127 }
128}