1use std::sync::Arc;
7use std::sync::LazyLock;
8
9use vortex_alp::ALPRDVTable;
12use vortex_alp::ALPVTable;
13use vortex_array::arrays::BoolVTable;
14use vortex_array::arrays::ChunkedVTable;
15use vortex_array::arrays::ConstantVTable;
16use vortex_array::arrays::DecimalVTable;
17use vortex_array::arrays::DictVTable;
18use vortex_array::arrays::ExtensionVTable;
19use vortex_array::arrays::FixedSizeListVTable;
20use vortex_array::arrays::ListVTable;
21use vortex_array::arrays::ListViewVTable;
22use vortex_array::arrays::MaskedVTable;
23use vortex_array::arrays::NullVTable;
24use vortex_array::arrays::PrimitiveVTable;
25use vortex_array::arrays::StructVTable;
26use vortex_array::arrays::VarBinVTable;
27use vortex_array::arrays::VarBinViewVTable;
28use vortex_array::session::ArrayRegistry;
29#[cfg(feature = "zstd")]
30use vortex_btrblocks::BtrBlocksCompressorBuilder;
31#[cfg(feature = "zstd")]
32use vortex_btrblocks::FloatCode;
33#[cfg(feature = "zstd")]
34use vortex_btrblocks::IntCode;
35#[cfg(feature = "zstd")]
36use vortex_btrblocks::StringCode;
37use vortex_bytebool::ByteBoolVTable;
38use vortex_datetime_parts::DateTimePartsVTable;
39use vortex_decimal_byte_parts::DecimalBytePartsVTable;
40use vortex_dtype::FieldPath;
41use vortex_fastlanes::BitPackedVTable;
42use vortex_fastlanes::DeltaVTable;
43use vortex_fastlanes::FoRVTable;
44use vortex_fastlanes::RLEVTable;
45use vortex_fsst::FSSTVTable;
46use vortex_layout::LayoutStrategy;
47use vortex_layout::layouts::buffered::BufferedStrategy;
48use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
49use vortex_layout::layouts::collect::CollectStrategy;
50use vortex_layout::layouts::compressed::CompressingStrategy;
51use vortex_layout::layouts::compressed::CompressorPlugin;
52use vortex_layout::layouts::dict::writer::DictStrategy;
53use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
54use vortex_layout::layouts::repartition::RepartitionStrategy;
55use vortex_layout::layouts::repartition::RepartitionWriterOptions;
56use vortex_layout::layouts::table::TableStrategy;
57use vortex_layout::layouts::zoned::writer::ZonedLayoutOptions;
58use vortex_layout::layouts::zoned::writer::ZonedStrategy;
59use vortex_pco::PcoVTable;
60use vortex_runend::RunEndVTable;
61use vortex_sequence::SequenceVTable;
62use vortex_sparse::SparseVTable;
63use vortex_utils::aliases::hash_map::HashMap;
64use vortex_zigzag::ZigZagVTable;
65#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
66use vortex_zstd::ZstdBuffersVTable;
67#[cfg(feature = "zstd")]
68use vortex_zstd::ZstdVTable;
69
70const ONE_MEG: u64 = 1 << 20;
71
72pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
77 let registry = ArrayRegistry::default();
78
79 registry.register(NullVTable::ID, NullVTable);
81 registry.register(BoolVTable::ID, BoolVTable);
82 registry.register(PrimitiveVTable::ID, PrimitiveVTable);
83 registry.register(DecimalVTable::ID, DecimalVTable);
84 registry.register(VarBinVTable::ID, VarBinVTable);
85 registry.register(VarBinViewVTable::ID, VarBinViewVTable);
86 registry.register(ListVTable::ID, ListVTable);
87 registry.register(ListViewVTable::ID, ListViewVTable);
88 registry.register(FixedSizeListVTable::ID, FixedSizeListVTable);
89 registry.register(StructVTable::ID, StructVTable);
90 registry.register(ExtensionVTable::ID, ExtensionVTable);
91 registry.register(ChunkedVTable::ID, ChunkedVTable);
92 registry.register(ConstantVTable::ID, ConstantVTable);
93 registry.register(MaskedVTable::ID, MaskedVTable);
94 registry.register(DictVTable::ID, DictVTable);
95
96 registry.register(ALPVTable::ID, ALPVTable);
98 registry.register(ALPRDVTable::ID, ALPRDVTable);
99 registry.register(BitPackedVTable::ID, BitPackedVTable);
100 registry.register(ByteBoolVTable::ID, ByteBoolVTable);
101 registry.register(DateTimePartsVTable::ID, DateTimePartsVTable);
102 registry.register(DecimalBytePartsVTable::ID, DecimalBytePartsVTable);
103 registry.register(DeltaVTable::ID, DeltaVTable);
104 registry.register(FoRVTable::ID, FoRVTable);
105 registry.register(FSSTVTable::ID, FSSTVTable);
106 registry.register(PcoVTable::ID, PcoVTable);
107 registry.register(RLEVTable::ID, RLEVTable);
108 registry.register(RunEndVTable::ID, RunEndVTable);
109 registry.register(SequenceVTable::ID, SequenceVTable);
110 registry.register(SparseVTable::ID, SparseVTable);
111 registry.register(ZigZagVTable::ID, ZigZagVTable);
112
113 #[cfg(feature = "zstd")]
114 registry.register(ZstdVTable::ID, ZstdVTable);
115 #[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
116 registry.register(ZstdBuffersVTable::ID, ZstdBuffersVTable);
117
118 registry
119});
120
121pub struct WriteStrategyBuilder {
127 compressor: Option<Arc<dyn CompressorPlugin>>,
128 row_block_size: usize,
129 field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
130 allow_encodings: Option<ArrayRegistry>,
131 flat_strategy: Option<Arc<dyn LayoutStrategy>>,
132}
133
134impl Default for WriteStrategyBuilder {
135 fn default() -> Self {
138 Self {
139 compressor: None,
140 row_block_size: 8192,
141 field_writers: HashMap::new(),
142 allow_encodings: None,
143 flat_strategy: None,
144 }
145 }
146}
147
148impl WriteStrategyBuilder {
149 pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
154 self.compressor = Some(Arc::new(compressor));
155 self
156 }
157
158 pub fn with_row_block_size(mut self, row_block_size: usize) -> Self {
160 self.row_block_size = row_block_size;
161 self
162 }
163
164 pub fn with_field_writer(
167 mut self,
168 field: impl Into<FieldPath>,
169 writer: Arc<dyn LayoutStrategy>,
170 ) -> Self {
171 self.field_writers.insert(field.into(), writer);
172 self
173 }
174
175 pub fn with_allow_encodings(mut self, allow_encodings: ArrayRegistry) -> Self {
177 self.allow_encodings = Some(allow_encodings);
178 self
179 }
180
181 pub fn with_flat_strategy(mut self, flat: Arc<dyn LayoutStrategy>) -> Self {
186 self.flat_strategy = Some(flat);
187 self
188 }
189
190 #[cfg(feature = "zstd")]
197 pub fn with_cuda_compatible_encodings(mut self) -> Self {
198 let mut builder = BtrBlocksCompressorBuilder::default()
199 .exclude_int([IntCode::Sparse, IntCode::Rle])
200 .exclude_float([FloatCode::AlpRd, FloatCode::Rle, FloatCode::Sparse])
201 .exclude_string([StringCode::Dict, StringCode::Fsst]);
202
203 #[cfg(feature = "unstable_encodings")]
204 {
205 builder = builder.include_string([StringCode::ZstdBuffers]);
206 }
207 #[cfg(not(feature = "unstable_encodings"))]
208 {
209 builder = builder.include_string([StringCode::Zstd]);
210 }
211
212 self.compressor = Some(Arc::new(builder.build()));
213 self
214 }
215
216 #[cfg(feature = "zstd")]
222 pub fn with_compact_encodings(mut self) -> Self {
223 let btrblocks = BtrBlocksCompressorBuilder::default()
224 .include_string([StringCode::Zstd])
225 .include_int([IntCode::Pco])
226 .include_float([FloatCode::Pco])
227 .build();
228
229 self.compressor = Some(Arc::new(btrblocks));
230 self
231 }
232
233 pub fn build(self) -> Arc<dyn LayoutStrategy> {
236 let flat: Arc<dyn LayoutStrategy> = if let Some(flat) = self.flat_strategy {
237 flat
238 } else if let Some(allow_encodings) = self.allow_encodings {
239 Arc::new(FlatLayoutStrategy::default().with_allow_encodings(allow_encodings))
240 } else {
241 Arc::new(FlatLayoutStrategy::default())
242 };
243
244 let chunked = ChunkedLayoutStrategy::new(flat.clone());
246 let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); let compressing = if let Some(ref compressor) = self.compressor {
250 CompressingStrategy::new_opaque(buffered, compressor.clone())
251 } else {
252 CompressingStrategy::new_btrblocks(buffered, true)
253 };
254
255 let coalescing = RepartitionStrategy::new(
257 compressing,
258 RepartitionWriterOptions {
259 block_size_minimum: ONE_MEG,
266 block_len_multiple: self.row_block_size,
267 block_size_target: Some(ONE_MEG),
268 canonicalize: true,
269 },
270 );
271
272 let compress_then_flat = if let Some(ref compressor) = self.compressor {
274 CompressingStrategy::new_opaque(flat, compressor.clone())
275 } else {
276 CompressingStrategy::new_btrblocks(flat, false)
277 };
278
279 let dict = DictStrategy::new(
281 coalescing.clone(),
282 compress_then_flat.clone(),
283 coalescing,
284 Default::default(),
285 );
286
287 let stats = ZonedStrategy::new(
289 dict,
290 compress_then_flat.clone(),
291 ZonedLayoutOptions {
292 block_size: self.row_block_size,
293 ..Default::default()
294 },
295 );
296
297 let repartition = RepartitionStrategy::new(
299 stats,
300 RepartitionWriterOptions {
301 block_size_minimum: 0,
303 block_len_multiple: self.row_block_size,
305 block_size_target: None,
306 canonicalize: false,
307 },
308 );
309
310 let validity_strategy = CollectStrategy::new(compress_then_flat);
312
313 let table_strategy = TableStrategy::new(Arc::new(validity_strategy), Arc::new(repartition))
315 .with_field_writers(self.field_writers);
316
317 Arc::new(table_strategy)
318 }
319}