Skip to main content

rivet/config/
format.rs

1//! Output format & compression: parquet/csv selector, codecs, parquet tuning.
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
7#[serde(rename_all = "lowercase")]
8pub enum FormatType {
9    Parquet,
10    Csv,
11}
12
13impl FormatType {
14    /// Stable lowercase string label for persistence and display.
15    /// Prefer this over `format!("{:?}", self).to_lowercase()` — `Debug` output
16    /// is not a stable format contract.
17    pub fn label(self) -> &'static str {
18        match self {
19            FormatType::Parquet => "parquet",
20            FormatType::Csv => "csv",
21        }
22    }
23}
24
25#[derive(Debug, Default, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
26#[serde(rename_all = "lowercase")]
27pub enum CompressionType {
28    #[default]
29    Zstd,
30    Snappy,
31    Gzip,
32    Lz4,
33    None,
34}
35
36impl CompressionType {
37    /// Stable lowercase string label for persistence and display.
38    pub fn label(self) -> &'static str {
39        match self {
40            CompressionType::Zstd => "zstd",
41            CompressionType::Snappy => "snappy",
42            CompressionType::Gzip => "gzip",
43            CompressionType::Lz4 => "lz4",
44            CompressionType::None => "none",
45        }
46    }
47}
48
49/// Parquet row group tuning strategy.
50///
51/// Controls how many rows Rivet places in each Parquet row group. Row group size
52/// affects memory usage during write, compression ratio, and downstream read
53/// performance (predicate pushdown, column skipping).
54///
55/// ```yaml
56/// exports:
57///   - name: events
58///     parquet:
59///       row_group_strategy: auto          # compute from schema + target_row_group_mb
60///       target_row_group_mb: 128          # default target; auto + fixed_memory only
61///       max_row_group_mb: 256             # optional upper bound (all strategies)
62///       # row_group_strategy: fixed_rows  # exact row count
63///       # row_group_rows: 500000          # used with fixed_rows
64///       # row_group_strategy: fixed_memory  # same math as auto, made explicit
65/// ```
66#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
67#[serde(rename_all = "snake_case")]
68pub enum RowGroupStrategy {
69    /// Compute rows-per-group from schema column types and `target_row_group_mb`.
70    /// For narrow tables this produces large groups (efficient). For wide tables
71    /// it reduces group size to stay within the memory target.
72    #[default]
73    Auto,
74    /// Use `row_group_rows` as a literal row count. Ignores memory targets.
75    FixedRows,
76    /// Identical math to `auto`, but the strategy label is explicit in logs.
77    FixedMemory,
78}
79
80/// Parquet-specific tuning for row group sizing.
81#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
82#[serde(deny_unknown_fields)]
83pub struct ParquetConfig {
84    /// How to determine the row group size. Default: `auto`.
85    pub row_group_strategy: Option<RowGroupStrategy>,
86    /// Exact number of rows per group (`fixed_rows` only).
87    pub row_group_rows: Option<usize>,
88    /// Target Arrow buffer memory per row group in MB (`auto` and `fixed_memory`). Default: 128.
89    pub target_row_group_mb: Option<usize>,
90    /// Hard upper bound on row group memory in MB. When set, further reduces computed row count.
91    pub max_row_group_mb: Option<usize>,
92}
93
94impl ParquetConfig {
95    pub const DEFAULT_TARGET_ROW_GROUP_MB: usize = 128;
96
97    /// Compute the effective rows-per-group from schema column types.
98    ///
99    /// Returns `None` for `fixed_rows` when `row_group_rows` is not set (caller
100    /// falls back to the parquet library default of 1,048,576 rows).
101    pub fn effective_row_group_rows(&self, schema: &arrow::datatypes::SchemaRef) -> Option<usize> {
102        let strategy = self.row_group_strategy.unwrap_or_default();
103        match strategy {
104            RowGroupStrategy::FixedRows => self.row_group_rows,
105            RowGroupStrategy::Auto | RowGroupStrategy::FixedMemory => {
106                let target_mb = self
107                    .target_row_group_mb
108                    .unwrap_or(Self::DEFAULT_TARGET_ROW_GROUP_MB);
109                let row_bytes = crate::tuning::estimate_row_bytes(schema).max(1);
110                let rows = (target_mb * 1024 * 1024) / row_bytes;
111                // Clamp to a safe range: at least 1 000 rows, at most 10 M rows.
112                let rows = rows.clamp(1_000, 10_000_000);
113                // Apply optional max_row_group_mb cap.
114                let rows = if let Some(max_mb) = self.max_row_group_mb {
115                    let max_rows = ((max_mb * 1024 * 1024) / row_bytes).max(1_000);
116                    rows.min(max_rows)
117                } else {
118                    rows
119                };
120                Some(rows)
121            }
122        }
123    }
124}
125
126/// High-level compression preset. Maps to a `(CompressionType, level)` pair.
127///
128/// ```yaml
129/// exports:
130///   - name: events
131///     compression_profile: fast   # snappy — fastest, larger files
132///     # compression_profile: balanced  # zstd level 3 — default for production
133///     # compression_profile: compact   # zstd level 9 — smallest files, more CPU
134///     # compression_profile: none      # no compression
135/// ```
136///
137/// When set, takes precedence over `compression` and `compression_level`.
138#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
139#[serde(rename_all = "lowercase")]
140pub enum CompressionProfile {
141    None,
142    Fast,
143    Balanced,
144    Compact,
145}
146
147impl CompressionProfile {
148    #[allow(dead_code)]
149    pub fn label(self) -> &'static str {
150        match self {
151            CompressionProfile::None => "none",
152            CompressionProfile::Fast => "fast",
153            CompressionProfile::Balanced => "balanced",
154            CompressionProfile::Compact => "compact",
155        }
156    }
157
158    pub fn to_codec(self) -> (CompressionType, Option<u32>) {
159        match self {
160            CompressionProfile::None => (CompressionType::None, None),
161            CompressionProfile::Fast => (CompressionType::Snappy, None),
162            CompressionProfile::Balanced => (CompressionType::Zstd, Some(3)),
163            CompressionProfile::Compact => (CompressionType::Zstd, Some(9)),
164        }
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    // ── Label methods stability ──────────────────────────────────────────────
173
174    #[test]
175    fn format_type_labels_stable() {
176        assert_eq!(FormatType::Parquet.label(), "parquet");
177        assert_eq!(FormatType::Csv.label(), "csv");
178    }
179
180    #[test]
181    fn compression_type_labels_stable() {
182        assert_eq!(CompressionType::Zstd.label(), "zstd");
183        assert_eq!(CompressionType::Snappy.label(), "snappy");
184        assert_eq!(CompressionType::Gzip.label(), "gzip");
185        assert_eq!(CompressionType::Lz4.label(), "lz4");
186        assert_eq!(CompressionType::None.label(), "none");
187    }
188
189    // ── ParquetConfig::effective_row_group_rows ─────────────────────────────
190
191    fn narrow_schema() -> arrow::datatypes::SchemaRef {
192        use arrow::datatypes::{DataType, Field, Schema};
193        std::sync::Arc::new(Schema::new(vec![
194            Field::new("id", DataType::Int64, false),
195            Field::new("created_at", DataType::Int64, false),
196        ]))
197    }
198
199    fn wide_schema() -> arrow::datatypes::SchemaRef {
200        use arrow::datatypes::{DataType, Field, Schema};
201        let fields: Vec<Field> = (0..50)
202            .map(|i| Field::new(format!("col{i}"), DataType::Utf8, true))
203            .collect();
204        std::sync::Arc::new(Schema::new(fields))
205    }
206
207    #[test]
208    fn parquet_config_fixed_rows_returns_explicit_count() {
209        let pc = ParquetConfig {
210            row_group_strategy: Some(RowGroupStrategy::FixedRows),
211            row_group_rows: Some(250_000),
212            ..Default::default()
213        };
214        assert_eq!(pc.effective_row_group_rows(&narrow_schema()), Some(250_000));
215    }
216
217    #[test]
218    fn parquet_config_fixed_rows_without_row_group_rows_returns_none() {
219        let pc = ParquetConfig {
220            row_group_strategy: Some(RowGroupStrategy::FixedRows),
221            row_group_rows: None,
222            ..Default::default()
223        };
224        assert_eq!(pc.effective_row_group_rows(&narrow_schema()), None);
225    }
226
227    #[test]
228    fn parquet_config_auto_narrow_table_produces_large_groups() {
229        let pc = ParquetConfig {
230            row_group_strategy: Some(RowGroupStrategy::Auto),
231            target_row_group_mb: Some(128),
232            ..Default::default()
233        };
234        let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
235        assert!(
236            rows >= 1_000_000,
237            "narrow table should get large groups, got {rows}"
238        );
239    }
240
241    #[test]
242    fn parquet_config_auto_wide_table_produces_smaller_groups() {
243        let pc = ParquetConfig {
244            row_group_strategy: Some(RowGroupStrategy::Auto),
245            target_row_group_mb: Some(128),
246            ..Default::default()
247        };
248        let rows = pc.effective_row_group_rows(&wide_schema()).unwrap();
249        assert!(
250            rows < 100_000,
251            "wide table should get smaller groups, got {rows}"
252        );
253        assert!(rows >= 1_000, "should be at least the minimum, got {rows}");
254    }
255
256    #[test]
257    fn parquet_config_max_row_group_mb_caps_result() {
258        let pc = ParquetConfig {
259            row_group_strategy: Some(RowGroupStrategy::Auto),
260            target_row_group_mb: Some(128),
261            max_row_group_mb: Some(1),
262            ..Default::default()
263        };
264        let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
265        assert!(
266            rows <= 100_000,
267            "max_row_group_mb should cap rows, got {rows}"
268        );
269    }
270
271    #[test]
272    fn parquet_config_deserializes_from_yaml() {
273        let yaml = "row_group_strategy: auto\ntarget_row_group_mb: 64\n";
274        let pc: ParquetConfig = serde_yaml_ng::from_str(yaml).unwrap();
275        assert_eq!(pc.row_group_strategy, Some(RowGroupStrategy::Auto));
276        assert_eq!(pc.target_row_group_mb, Some(64));
277    }
278
279    #[test]
280    fn parquet_config_fixed_memory_same_math_as_auto() {
281        let auto_pc = ParquetConfig {
282            row_group_strategy: Some(RowGroupStrategy::Auto),
283            target_row_group_mb: Some(64),
284            ..Default::default()
285        };
286        let fixed_mem_pc = ParquetConfig {
287            row_group_strategy: Some(RowGroupStrategy::FixedMemory),
288            target_row_group_mb: Some(64),
289            ..Default::default()
290        };
291        assert_eq!(
292            auto_pc.effective_row_group_rows(&narrow_schema()),
293            fixed_mem_pc.effective_row_group_rows(&narrow_schema()),
294            "FixedMemory and Auto must produce identical row counts for the same target"
295        );
296        assert_eq!(
297            auto_pc.effective_row_group_rows(&wide_schema()),
298            fixed_mem_pc.effective_row_group_rows(&wide_schema()),
299        );
300    }
301
302    #[test]
303    fn parquet_config_auto_without_target_uses_default_128mb() {
304        let pc = ParquetConfig {
305            row_group_strategy: Some(RowGroupStrategy::Auto),
306            target_row_group_mb: None,
307            ..Default::default()
308        };
309        let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
310        assert!(
311            rows >= 1_000_000,
312            "default 128 MB target should give large groups for narrow table; got {rows}"
313        );
314    }
315
316    #[test]
317    fn parquet_config_no_block_gives_none_for_row_group_rows() {
318        let pc = ParquetConfig::default();
319        let rows = pc.effective_row_group_rows(&narrow_schema());
320        assert!(
321            rows.is_some(),
322            "default ParquetConfig (strategy: None) must return Some, got None"
323        );
324    }
325
326    #[test]
327    fn parquet_config_small_target_clamps_to_minimum_1000_rows() {
328        let pc = ParquetConfig {
329            row_group_strategy: Some(RowGroupStrategy::Auto),
330            target_row_group_mb: Some(1),
331            ..Default::default()
332        };
333        let rows = pc.effective_row_group_rows(&wide_schema()).unwrap();
334        assert!(
335            rows >= 1_000,
336            "must not go below minimum 1 000 rows; got {rows}"
337        );
338    }
339}