1use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
7#[serde(rename_all = "lowercase")]
8pub enum FormatType {
9 Parquet,
10 Csv,
11}
12
13impl FormatType {
14 pub fn label(self) -> &'static str {
18 match self {
19 FormatType::Parquet => "parquet",
20 FormatType::Csv => "csv",
21 }
22 }
23}
24
25#[derive(Debug, Default, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
26#[serde(rename_all = "lowercase")]
27pub enum CompressionType {
28 #[default]
29 Zstd,
30 Snappy,
31 Gzip,
32 Lz4,
33 None,
34}
35
36impl CompressionType {
37 pub fn label(self) -> &'static str {
39 match self {
40 CompressionType::Zstd => "zstd",
41 CompressionType::Snappy => "snappy",
42 CompressionType::Gzip => "gzip",
43 CompressionType::Lz4 => "lz4",
44 CompressionType::None => "none",
45 }
46 }
47}
48
49#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
67#[serde(rename_all = "snake_case")]
68pub enum RowGroupStrategy {
69 #[default]
73 Auto,
74 FixedRows,
76 FixedMemory,
78}
79
80#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
82#[serde(deny_unknown_fields)]
83pub struct ParquetConfig {
84 pub row_group_strategy: Option<RowGroupStrategy>,
86 pub row_group_rows: Option<usize>,
88 pub target_row_group_mb: Option<usize>,
90 pub max_row_group_mb: Option<usize>,
92}
93
94impl ParquetConfig {
95 pub const DEFAULT_TARGET_ROW_GROUP_MB: usize = 128;
96
97 pub fn effective_row_group_rows(&self, schema: &arrow::datatypes::SchemaRef) -> Option<usize> {
102 let strategy = self.row_group_strategy.unwrap_or_default();
103 match strategy {
104 RowGroupStrategy::FixedRows => self.row_group_rows,
105 RowGroupStrategy::Auto | RowGroupStrategy::FixedMemory => {
106 let target_mb = self
107 .target_row_group_mb
108 .unwrap_or(Self::DEFAULT_TARGET_ROW_GROUP_MB);
109 let row_bytes = crate::tuning::estimate_row_bytes(schema).max(1);
110 let rows = (target_mb * 1024 * 1024) / row_bytes;
111 let rows = rows.clamp(1_000, 10_000_000);
113 let rows = if let Some(max_mb) = self.max_row_group_mb {
115 let max_rows = ((max_mb * 1024 * 1024) / row_bytes).max(1_000);
116 rows.min(max_rows)
117 } else {
118 rows
119 };
120 Some(rows)
121 }
122 }
123 }
124}
125
126#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
139#[serde(rename_all = "lowercase")]
140pub enum CompressionProfile {
141 None,
142 Fast,
143 Balanced,
144 Compact,
145}
146
147impl CompressionProfile {
148 #[allow(dead_code)]
149 pub fn label(self) -> &'static str {
150 match self {
151 CompressionProfile::None => "none",
152 CompressionProfile::Fast => "fast",
153 CompressionProfile::Balanced => "balanced",
154 CompressionProfile::Compact => "compact",
155 }
156 }
157
158 pub fn to_codec(self) -> (CompressionType, Option<u32>) {
159 match self {
160 CompressionProfile::None => (CompressionType::None, None),
161 CompressionProfile::Fast => (CompressionType::Snappy, None),
162 CompressionProfile::Balanced => (CompressionType::Zstd, Some(3)),
163 CompressionProfile::Compact => (CompressionType::Zstd, Some(9)),
164 }
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
175 fn format_type_labels_stable() {
176 assert_eq!(FormatType::Parquet.label(), "parquet");
177 assert_eq!(FormatType::Csv.label(), "csv");
178 }
179
180 #[test]
181 fn compression_type_labels_stable() {
182 assert_eq!(CompressionType::Zstd.label(), "zstd");
183 assert_eq!(CompressionType::Snappy.label(), "snappy");
184 assert_eq!(CompressionType::Gzip.label(), "gzip");
185 assert_eq!(CompressionType::Lz4.label(), "lz4");
186 assert_eq!(CompressionType::None.label(), "none");
187 }
188
189 fn narrow_schema() -> arrow::datatypes::SchemaRef {
192 use arrow::datatypes::{DataType, Field, Schema};
193 std::sync::Arc::new(Schema::new(vec![
194 Field::new("id", DataType::Int64, false),
195 Field::new("created_at", DataType::Int64, false),
196 ]))
197 }
198
199 fn wide_schema() -> arrow::datatypes::SchemaRef {
200 use arrow::datatypes::{DataType, Field, Schema};
201 let fields: Vec<Field> = (0..50)
202 .map(|i| Field::new(format!("col{i}"), DataType::Utf8, true))
203 .collect();
204 std::sync::Arc::new(Schema::new(fields))
205 }
206
207 #[test]
208 fn parquet_config_fixed_rows_returns_explicit_count() {
209 let pc = ParquetConfig {
210 row_group_strategy: Some(RowGroupStrategy::FixedRows),
211 row_group_rows: Some(250_000),
212 ..Default::default()
213 };
214 assert_eq!(pc.effective_row_group_rows(&narrow_schema()), Some(250_000));
215 }
216
217 #[test]
218 fn parquet_config_fixed_rows_without_row_group_rows_returns_none() {
219 let pc = ParquetConfig {
220 row_group_strategy: Some(RowGroupStrategy::FixedRows),
221 row_group_rows: None,
222 ..Default::default()
223 };
224 assert_eq!(pc.effective_row_group_rows(&narrow_schema()), None);
225 }
226
227 #[test]
228 fn parquet_config_auto_narrow_table_produces_large_groups() {
229 let pc = ParquetConfig {
230 row_group_strategy: Some(RowGroupStrategy::Auto),
231 target_row_group_mb: Some(128),
232 ..Default::default()
233 };
234 let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
235 assert!(
236 rows >= 1_000_000,
237 "narrow table should get large groups, got {rows}"
238 );
239 }
240
241 #[test]
242 fn parquet_config_auto_wide_table_produces_smaller_groups() {
243 let pc = ParquetConfig {
244 row_group_strategy: Some(RowGroupStrategy::Auto),
245 target_row_group_mb: Some(128),
246 ..Default::default()
247 };
248 let rows = pc.effective_row_group_rows(&wide_schema()).unwrap();
249 assert!(
250 rows < 100_000,
251 "wide table should get smaller groups, got {rows}"
252 );
253 assert!(rows >= 1_000, "should be at least the minimum, got {rows}");
254 }
255
256 #[test]
257 fn parquet_config_max_row_group_mb_caps_result() {
258 let pc = ParquetConfig {
259 row_group_strategy: Some(RowGroupStrategy::Auto),
260 target_row_group_mb: Some(128),
261 max_row_group_mb: Some(1),
262 ..Default::default()
263 };
264 let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
265 assert!(
266 rows <= 100_000,
267 "max_row_group_mb should cap rows, got {rows}"
268 );
269 }
270
271 #[test]
272 fn parquet_config_deserializes_from_yaml() {
273 let yaml = "row_group_strategy: auto\ntarget_row_group_mb: 64\n";
274 let pc: ParquetConfig = serde_yaml_ng::from_str(yaml).unwrap();
275 assert_eq!(pc.row_group_strategy, Some(RowGroupStrategy::Auto));
276 assert_eq!(pc.target_row_group_mb, Some(64));
277 }
278
279 #[test]
280 fn parquet_config_fixed_memory_same_math_as_auto() {
281 let auto_pc = ParquetConfig {
282 row_group_strategy: Some(RowGroupStrategy::Auto),
283 target_row_group_mb: Some(64),
284 ..Default::default()
285 };
286 let fixed_mem_pc = ParquetConfig {
287 row_group_strategy: Some(RowGroupStrategy::FixedMemory),
288 target_row_group_mb: Some(64),
289 ..Default::default()
290 };
291 assert_eq!(
292 auto_pc.effective_row_group_rows(&narrow_schema()),
293 fixed_mem_pc.effective_row_group_rows(&narrow_schema()),
294 "FixedMemory and Auto must produce identical row counts for the same target"
295 );
296 assert_eq!(
297 auto_pc.effective_row_group_rows(&wide_schema()),
298 fixed_mem_pc.effective_row_group_rows(&wide_schema()),
299 );
300 }
301
302 #[test]
303 fn parquet_config_auto_without_target_uses_default_128mb() {
304 let pc = ParquetConfig {
305 row_group_strategy: Some(RowGroupStrategy::Auto),
306 target_row_group_mb: None,
307 ..Default::default()
308 };
309 let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
310 assert!(
311 rows >= 1_000_000,
312 "default 128 MB target should give large groups for narrow table; got {rows}"
313 );
314 }
315
316 #[test]
317 fn parquet_config_no_block_gives_none_for_row_group_rows() {
318 let pc = ParquetConfig::default();
319 let rows = pc.effective_row_group_rows(&narrow_schema());
320 assert!(
321 rows.is_some(),
322 "default ParquetConfig (strategy: None) must return Some, got None"
323 );
324 }
325
326 #[test]
327 fn parquet_config_small_target_clamps_to_minimum_1000_rows() {
328 let pc = ParquetConfig {
329 row_group_strategy: Some(RowGroupStrategy::Auto),
330 target_row_group_mb: Some(1),
331 ..Default::default()
332 };
333 let rows = pc.effective_row_group_rows(&wide_schema()).unwrap();
334 assert!(
335 rows >= 1_000,
336 "must not go below minimum 1 000 rows; got {rows}"
337 );
338 }
339}