1use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
7#[serde(rename_all = "lowercase")]
8pub enum FormatType {
9 Parquet,
10 Csv,
11}
12
13impl FormatType {
14 pub fn label(self) -> &'static str {
18 match self {
19 FormatType::Parquet => "parquet",
20 FormatType::Csv => "csv",
21 }
22 }
23}
24
25pub fn compression_supported(format: FormatType, compression: CompressionType) -> bool {
34 match format {
35 FormatType::Parquet => true,
36 FormatType::Csv => matches!(compression, CompressionType::None),
37 }
38}
39
40#[derive(Debug, Default, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
41#[serde(rename_all = "lowercase")]
42pub enum CompressionType {
43 #[default]
44 Zstd,
45 Snappy,
46 Gzip,
47 Lz4,
48 None,
49}
50
51impl CompressionType {
52 pub fn label(self) -> &'static str {
54 match self {
55 CompressionType::Zstd => "zstd",
56 CompressionType::Snappy => "snappy",
57 CompressionType::Gzip => "gzip",
58 CompressionType::Lz4 => "lz4",
59 CompressionType::None => "none",
60 }
61 }
62
63 pub fn from_label(s: &str) -> Option<Self> {
70 match s {
71 "zstd" => Some(CompressionType::Zstd),
72 "snappy" => Some(CompressionType::Snappy),
73 "gzip" => Some(CompressionType::Gzip),
74 "lz4" => Some(CompressionType::Lz4),
75 "none" => Some(CompressionType::None),
76 _ => None,
77 }
78 }
79}
80
81#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
99#[serde(rename_all = "snake_case")]
100pub enum RowGroupStrategy {
101 #[default]
105 Auto,
106 FixedRows,
108 FixedMemory,
110}
111
112#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
114#[serde(deny_unknown_fields)]
115pub struct ParquetConfig {
116 pub row_group_strategy: Option<RowGroupStrategy>,
118 pub row_group_rows: Option<usize>,
120 pub target_row_group_mb: Option<usize>,
122 pub max_row_group_mb: Option<usize>,
124}
125
126impl ParquetConfig {
127 pub const DEFAULT_TARGET_ROW_GROUP_MB: usize = 128;
128
129 pub fn effective_row_group_rows(&self, schema: &arrow::datatypes::SchemaRef) -> Option<usize> {
134 let strategy = self.row_group_strategy.unwrap_or_default();
135 match strategy {
136 RowGroupStrategy::FixedRows => self.row_group_rows,
137 RowGroupStrategy::Auto | RowGroupStrategy::FixedMemory => {
138 let target_mb = self
139 .target_row_group_mb
140 .unwrap_or(Self::DEFAULT_TARGET_ROW_GROUP_MB);
141 let row_bytes = crate::tuning::estimate_row_bytes(schema).max(1);
142 let rows = (target_mb * 1024 * 1024) / row_bytes;
143 let rows = rows.clamp(1_000, 10_000_000);
145 let rows = if let Some(max_mb) = self.max_row_group_mb {
147 let max_rows = ((max_mb * 1024 * 1024) / row_bytes).max(1_000);
148 rows.min(max_rows)
149 } else {
150 rows
151 };
152 Some(rows)
153 }
154 }
155 }
156}
157
158#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
171#[serde(rename_all = "lowercase")]
172pub enum CompressionProfile {
173 None,
174 Fast,
175 Balanced,
176 Compact,
177}
178
179impl CompressionProfile {
180 #[allow(dead_code)]
181 pub fn label(self) -> &'static str {
182 match self {
183 CompressionProfile::None => "none",
184 CompressionProfile::Fast => "fast",
185 CompressionProfile::Balanced => "balanced",
186 CompressionProfile::Compact => "compact",
187 }
188 }
189
190 pub fn to_codec(self) -> (CompressionType, Option<u32>) {
191 match self {
192 CompressionProfile::None => (CompressionType::None, None),
193 CompressionProfile::Fast => (CompressionType::Snappy, None),
194 CompressionProfile::Balanced => (CompressionType::Zstd, Some(3)),
195 CompressionProfile::Compact => (CompressionType::Zstd, Some(9)),
196 }
197 }
198}
199
200pub fn compression_profile_override_warning(
209 profile: CompressionProfile,
210 explicit_compression: Option<CompressionType>,
211 explicit_level: Option<u32>,
212) -> Option<String> {
213 let (codec, _) = profile.to_codec();
214 if let Some(c) = explicit_compression
215 && c != codec
216 {
217 return Some(format!(
218 "compression_profile '{}' overrides explicit compression '{}' (using '{}')",
219 profile.label(),
220 c.label(),
221 codec.label(),
222 ));
223 }
224 if explicit_level.is_some() {
225 return Some(format!(
226 "compression_profile '{}' overrides explicit compression_level (the profile sets its own level)",
227 profile.label(),
228 ));
229 }
230 None
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
240 fn format_type_labels_stable() {
241 assert_eq!(FormatType::Parquet.label(), "parquet");
242 assert_eq!(FormatType::Csv.label(), "csv");
243 }
244
245 #[test]
246 fn compression_type_labels_stable() {
247 assert_eq!(CompressionType::Zstd.label(), "zstd");
248 assert_eq!(CompressionType::Snappy.label(), "snappy");
249 assert_eq!(CompressionType::Gzip.label(), "gzip");
250 assert_eq!(CompressionType::Lz4.label(), "lz4");
251 assert_eq!(CompressionType::None.label(), "none");
252 }
253
254 #[test]
257 fn profile_override_warns_on_conflicting_explicit_codec() {
258 let msg = compression_profile_override_warning(
261 CompressionProfile::Fast,
262 Some(CompressionType::Gzip),
263 None,
264 )
265 .expect("conflicting explicit codec must warn");
266 assert!(msg.contains("fast"), "got: {msg}");
267 assert!(msg.contains("gzip"), "got: {msg}");
268 assert!(msg.contains("snappy"), "winner codec named, got: {msg}");
269 }
270
271 #[test]
272 fn profile_override_warns_on_explicit_level() {
273 let msg = compression_profile_override_warning(CompressionProfile::Balanced, None, Some(7))
274 .expect("explicit compression_level under a profile must warn");
275 assert!(msg.contains("compression_level"), "got: {msg}");
276 }
277
278 #[test]
279 fn profile_override_silent_when_codec_matches_profile() {
280 assert!(
283 compression_profile_override_warning(
284 CompressionProfile::Fast,
285 Some(CompressionType::Snappy),
286 None,
287 )
288 .is_none()
289 );
290 }
291
292 #[test]
293 fn profile_override_silent_when_nothing_explicit() {
294 assert!(
295 compression_profile_override_warning(CompressionProfile::Compact, None, None).is_none()
296 );
297 }
298
299 fn narrow_schema() -> arrow::datatypes::SchemaRef {
302 use arrow::datatypes::{DataType, Field, Schema};
303 std::sync::Arc::new(Schema::new(vec![
304 Field::new("id", DataType::Int64, false),
305 Field::new("created_at", DataType::Int64, false),
306 ]))
307 }
308
309 fn wide_schema() -> arrow::datatypes::SchemaRef {
310 use arrow::datatypes::{DataType, Field, Schema};
311 let fields: Vec<Field> = (0..50)
312 .map(|i| Field::new(format!("col{i}"), DataType::Utf8, true))
313 .collect();
314 std::sync::Arc::new(Schema::new(fields))
315 }
316
317 #[test]
318 fn parquet_config_fixed_rows_returns_explicit_count() {
319 let pc = ParquetConfig {
320 row_group_strategy: Some(RowGroupStrategy::FixedRows),
321 row_group_rows: Some(250_000),
322 ..Default::default()
323 };
324 assert_eq!(pc.effective_row_group_rows(&narrow_schema()), Some(250_000));
325 }
326
327 #[test]
328 fn parquet_config_fixed_rows_without_row_group_rows_returns_none() {
329 let pc = ParquetConfig {
330 row_group_strategy: Some(RowGroupStrategy::FixedRows),
331 row_group_rows: None,
332 ..Default::default()
333 };
334 assert_eq!(pc.effective_row_group_rows(&narrow_schema()), None);
335 }
336
337 #[test]
338 fn parquet_config_auto_narrow_table_produces_large_groups() {
339 let pc = ParquetConfig {
340 row_group_strategy: Some(RowGroupStrategy::Auto),
341 target_row_group_mb: Some(128),
342 ..Default::default()
343 };
344 let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
345 assert!(
346 rows >= 1_000_000,
347 "narrow table should get large groups, got {rows}"
348 );
349 }
350
351 #[test]
352 fn parquet_config_auto_wide_table_produces_smaller_groups() {
353 let pc = ParquetConfig {
354 row_group_strategy: Some(RowGroupStrategy::Auto),
355 target_row_group_mb: Some(128),
356 ..Default::default()
357 };
358 let rows = pc.effective_row_group_rows(&wide_schema()).unwrap();
359 assert!(
360 rows < 100_000,
361 "wide table should get smaller groups, got {rows}"
362 );
363 assert!(rows >= 1_000, "should be at least the minimum, got {rows}");
364 }
365
366 #[test]
367 fn parquet_config_max_row_group_mb_caps_result() {
368 let pc = ParquetConfig {
369 row_group_strategy: Some(RowGroupStrategy::Auto),
370 target_row_group_mb: Some(128),
371 max_row_group_mb: Some(1),
372 ..Default::default()
373 };
374 let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
375 assert!(
376 rows <= 100_000,
377 "max_row_group_mb should cap rows, got {rows}"
378 );
379 }
380
381 #[test]
382 fn parquet_config_deserializes_from_yaml() {
383 let yaml = "row_group_strategy: auto\ntarget_row_group_mb: 64\n";
384 let pc: ParquetConfig = serde_yaml_ng::from_str(yaml).unwrap();
385 assert_eq!(pc.row_group_strategy, Some(RowGroupStrategy::Auto));
386 assert_eq!(pc.target_row_group_mb, Some(64));
387 }
388
389 #[test]
390 fn parquet_config_fixed_memory_same_math_as_auto() {
391 let auto_pc = ParquetConfig {
392 row_group_strategy: Some(RowGroupStrategy::Auto),
393 target_row_group_mb: Some(64),
394 ..Default::default()
395 };
396 let fixed_mem_pc = ParquetConfig {
397 row_group_strategy: Some(RowGroupStrategy::FixedMemory),
398 target_row_group_mb: Some(64),
399 ..Default::default()
400 };
401 assert_eq!(
402 auto_pc.effective_row_group_rows(&narrow_schema()),
403 fixed_mem_pc.effective_row_group_rows(&narrow_schema()),
404 "FixedMemory and Auto must produce identical row counts for the same target"
405 );
406 assert_eq!(
407 auto_pc.effective_row_group_rows(&wide_schema()),
408 fixed_mem_pc.effective_row_group_rows(&wide_schema()),
409 );
410 }
411
412 #[test]
413 fn parquet_config_auto_without_target_uses_default_128mb() {
414 let pc = ParquetConfig {
415 row_group_strategy: Some(RowGroupStrategy::Auto),
416 target_row_group_mb: None,
417 ..Default::default()
418 };
419 let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
420 assert!(
421 rows >= 1_000_000,
422 "default 128 MB target should give large groups for narrow table; got {rows}"
423 );
424 }
425
426 #[test]
427 fn parquet_config_no_block_gives_none_for_row_group_rows() {
428 let pc = ParquetConfig::default();
429 let rows = pc.effective_row_group_rows(&narrow_schema());
430 assert!(
431 rows.is_some(),
432 "default ParquetConfig (strategy: None) must return Some, got None"
433 );
434 }
435
436 #[test]
437 fn parquet_config_small_target_clamps_to_minimum_1000_rows() {
438 let pc = ParquetConfig {
439 row_group_strategy: Some(RowGroupStrategy::Auto),
440 target_row_group_mb: Some(1),
441 ..Default::default()
442 };
443 let rows = pc.effective_row_group_rows(&wide_schema()).unwrap();
444 assert!(
445 rows >= 1_000,
446 "must not go below minimum 1 000 rows; got {rows}"
447 );
448 }
449}