Skip to main content

rivet/config/
format.rs

1//! Output format & compression: parquet/csv selector, codecs, parquet tuning.
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
7#[serde(rename_all = "lowercase")]
8pub enum FormatType {
9    Parquet,
10    Csv,
11}
12
13impl FormatType {
14    /// Stable lowercase string label for persistence and display.
15    /// Prefer this over `format!("{:?}", self).to_lowercase()` — `Debug` output
16    /// is not a stable format contract.
17    pub fn label(self) -> &'static str {
18        match self {
19            FormatType::Parquet => "parquet",
20            FormatType::Csv => "csv",
21        }
22    }
23}
24
25/// Whether `format` actually encodes `compression` on write.
26///
27/// Only Parquet has a compression encoder (see [`crate::format::parquet`]); the
28/// CSV writer ([`crate::format::csv`]) ignores the codec entirely. Accepting a
29/// non-`None` codec for CSV would be a silent no-op — the file stays
30/// uncompressed while the run manifest records the codec — so config validation
31/// rejects that combination up-front (Finding #10). `None` is always supported
32/// (it means "do not compress").
33pub fn compression_supported(format: FormatType, compression: CompressionType) -> bool {
34    match format {
35        FormatType::Parquet => true,
36        FormatType::Csv => matches!(compression, CompressionType::None),
37    }
38}
39
40#[derive(Debug, Default, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
41#[serde(rename_all = "lowercase")]
42pub enum CompressionType {
43    #[default]
44    Zstd,
45    Snappy,
46    Gzip,
47    Lz4,
48    None,
49}
50
51impl CompressionType {
52    /// Stable lowercase string label for persistence and display.
53    pub fn label(self) -> &'static str {
54        match self {
55            CompressionType::Zstd => "zstd",
56            CompressionType::Snappy => "snappy",
57            CompressionType::Gzip => "gzip",
58            CompressionType::Lz4 => "lz4",
59            CompressionType::None => "none",
60        }
61    }
62
63    /// Parse a [`label`](Self::label)-shaped string back to a codec.
64    ///
65    /// Used by config validation to evaluate an *explicitly-written* codec
66    /// against [`compression_supported`] without re-deriving the serde mapping.
67    /// Returns `None` for any unrecognised string (serde rejects those during
68    /// the real parse anyway).
69    pub fn from_label(s: &str) -> Option<Self> {
70        match s {
71            "zstd" => Some(CompressionType::Zstd),
72            "snappy" => Some(CompressionType::Snappy),
73            "gzip" => Some(CompressionType::Gzip),
74            "lz4" => Some(CompressionType::Lz4),
75            "none" => Some(CompressionType::None),
76            _ => None,
77        }
78    }
79}
80
81/// Parquet row group tuning strategy.
82///
83/// Controls how many rows Rivet places in each Parquet row group. Row group size
84/// affects memory usage during write, compression ratio, and downstream read
85/// performance (predicate pushdown, column skipping).
86///
87/// ```yaml
88/// exports:
89///   - name: events
90///     parquet:
91///       row_group_strategy: auto          # compute from schema + target_row_group_mb
92///       target_row_group_mb: 128          # default target; auto + fixed_memory only
93///       max_row_group_mb: 256             # optional upper bound (all strategies)
94///       # row_group_strategy: fixed_rows  # exact row count
95///       # row_group_rows: 500000          # used with fixed_rows
96///       # row_group_strategy: fixed_memory  # same math as auto, made explicit
97/// ```
98#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
99#[serde(rename_all = "snake_case")]
100pub enum RowGroupStrategy {
101    /// Compute rows-per-group from schema column types and `target_row_group_mb`.
102    /// For narrow tables this produces large groups (efficient). For wide tables
103    /// it reduces group size to stay within the memory target.
104    #[default]
105    Auto,
106    /// Use `row_group_rows` as a literal row count. Ignores memory targets.
107    FixedRows,
108    /// Identical math to `auto`, but the strategy label is explicit in logs.
109    FixedMemory,
110}
111
112/// Parquet-specific tuning for row group sizing.
113#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
114#[serde(deny_unknown_fields)]
115pub struct ParquetConfig {
116    /// How to determine the row group size. Default: `auto`.
117    pub row_group_strategy: Option<RowGroupStrategy>,
118    /// Exact number of rows per group (`fixed_rows` only).
119    pub row_group_rows: Option<usize>,
120    /// Target Arrow buffer memory per row group in MB (`auto` and `fixed_memory`). Default: 128.
121    pub target_row_group_mb: Option<usize>,
122    /// Hard upper bound on row group memory in MB. When set, further reduces computed row count.
123    pub max_row_group_mb: Option<usize>,
124}
125
126impl ParquetConfig {
127    pub const DEFAULT_TARGET_ROW_GROUP_MB: usize = 128;
128
129    /// Compute the effective rows-per-group from schema column types.
130    ///
131    /// Returns `None` for `fixed_rows` when `row_group_rows` is not set (caller
132    /// falls back to the parquet library default of 1,048,576 rows).
133    pub fn effective_row_group_rows(&self, schema: &arrow::datatypes::SchemaRef) -> Option<usize> {
134        let strategy = self.row_group_strategy.unwrap_or_default();
135        match strategy {
136            RowGroupStrategy::FixedRows => self.row_group_rows,
137            RowGroupStrategy::Auto | RowGroupStrategy::FixedMemory => {
138                let target_mb = self
139                    .target_row_group_mb
140                    .unwrap_or(Self::DEFAULT_TARGET_ROW_GROUP_MB);
141                let row_bytes = crate::tuning::estimate_row_bytes(schema).max(1);
142                let rows = (target_mb * 1024 * 1024) / row_bytes;
143                // Clamp to a safe range: at least 1 000 rows, at most 10 M rows.
144                let rows = rows.clamp(1_000, 10_000_000);
145                // Apply optional max_row_group_mb cap.
146                let rows = if let Some(max_mb) = self.max_row_group_mb {
147                    let max_rows = ((max_mb * 1024 * 1024) / row_bytes).max(1_000);
148                    rows.min(max_rows)
149                } else {
150                    rows
151                };
152                Some(rows)
153            }
154        }
155    }
156}
157
158/// High-level compression preset. Maps to a `(CompressionType, level)` pair.
159///
160/// ```yaml
161/// exports:
162///   - name: events
163///     compression_profile: fast   # snappy — fastest, larger files
164///     # compression_profile: balanced  # zstd level 3 — default for production
165///     # compression_profile: compact   # zstd level 9 — smallest files, more CPU
166///     # compression_profile: none      # no compression
167/// ```
168///
169/// When set, takes precedence over `compression` and `compression_level`.
170#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
171#[serde(rename_all = "lowercase")]
172pub enum CompressionProfile {
173    None,
174    Fast,
175    Balanced,
176    Compact,
177}
178
179impl CompressionProfile {
180    #[allow(dead_code)]
181    pub fn label(self) -> &'static str {
182        match self {
183            CompressionProfile::None => "none",
184            CompressionProfile::Fast => "fast",
185            CompressionProfile::Balanced => "balanced",
186            CompressionProfile::Compact => "compact",
187        }
188    }
189
190    pub fn to_codec(self) -> (CompressionType, Option<u32>) {
191        match self {
192            CompressionProfile::None => (CompressionType::None, None),
193            CompressionProfile::Fast => (CompressionType::Snappy, None),
194            CompressionProfile::Balanced => (CompressionType::Zstd, Some(3)),
195            CompressionProfile::Compact => (CompressionType::Zstd, Some(9)),
196        }
197    }
198}
199
200/// L24: when a `compression_profile` is set *and* the user also wrote an
201/// explicit codec the profile silently discards, return a one-line warning
202/// naming both — otherwise `None`.
203///
204/// Pure (no logging) so it can be unit-tested; the caller emits the message.
205/// `explicit_compression` is `Some` only when the user actually wrote a
206/// `compression:` codec — a `#[serde(default)]` Zstd cannot be distinguished
207/// from an omitted field, so the caller passes `None` for the defaulted case.
208pub fn compression_profile_override_warning(
209    profile: CompressionProfile,
210    explicit_compression: Option<CompressionType>,
211    explicit_level: Option<u32>,
212) -> Option<String> {
213    let (codec, _) = profile.to_codec();
214    if let Some(c) = explicit_compression
215        && c != codec
216    {
217        return Some(format!(
218            "compression_profile '{}' overrides explicit compression '{}' (using '{}')",
219            profile.label(),
220            c.label(),
221            codec.label(),
222        ));
223    }
224    if explicit_level.is_some() {
225        return Some(format!(
226            "compression_profile '{}' overrides explicit compression_level (the profile sets its own level)",
227            profile.label(),
228        ));
229    }
230    None
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    // ── Label methods stability ──────────────────────────────────────────────
238
239    #[test]
240    fn format_type_labels_stable() {
241        assert_eq!(FormatType::Parquet.label(), "parquet");
242        assert_eq!(FormatType::Csv.label(), "csv");
243    }
244
245    #[test]
246    fn compression_type_labels_stable() {
247        assert_eq!(CompressionType::Zstd.label(), "zstd");
248        assert_eq!(CompressionType::Snappy.label(), "snappy");
249        assert_eq!(CompressionType::Gzip.label(), "gzip");
250        assert_eq!(CompressionType::Lz4.label(), "lz4");
251        assert_eq!(CompressionType::None.label(), "none");
252    }
253
254    // ── L24: compression_profile override warning ───────────────────────────
255
256    #[test]
257    fn profile_override_warns_on_conflicting_explicit_codec() {
258        // `compression_profile: fast` (snappy) with explicit `compression: gzip`
259        // must warn, naming both codecs and the winner.
260        let msg = compression_profile_override_warning(
261            CompressionProfile::Fast,
262            Some(CompressionType::Gzip),
263            None,
264        )
265        .expect("conflicting explicit codec must warn");
266        assert!(msg.contains("fast"), "got: {msg}");
267        assert!(msg.contains("gzip"), "got: {msg}");
268        assert!(msg.contains("snappy"), "winner codec named, got: {msg}");
269    }
270
271    #[test]
272    fn profile_override_warns_on_explicit_level() {
273        let msg = compression_profile_override_warning(CompressionProfile::Balanced, None, Some(7))
274            .expect("explicit compression_level under a profile must warn");
275        assert!(msg.contains("compression_level"), "got: {msg}");
276    }
277
278    #[test]
279    fn profile_override_silent_when_codec_matches_profile() {
280        // Profile `fast` resolves to snappy; an explicit `snappy` is not a
281        // conflict, so no warning.
282        assert!(
283            compression_profile_override_warning(
284                CompressionProfile::Fast,
285                Some(CompressionType::Snappy),
286                None,
287            )
288            .is_none()
289        );
290    }
291
292    #[test]
293    fn profile_override_silent_when_nothing_explicit() {
294        assert!(
295            compression_profile_override_warning(CompressionProfile::Compact, None, None).is_none()
296        );
297    }
298
299    // ── ParquetConfig::effective_row_group_rows ─────────────────────────────
300
301    fn narrow_schema() -> arrow::datatypes::SchemaRef {
302        use arrow::datatypes::{DataType, Field, Schema};
303        std::sync::Arc::new(Schema::new(vec![
304            Field::new("id", DataType::Int64, false),
305            Field::new("created_at", DataType::Int64, false),
306        ]))
307    }
308
309    fn wide_schema() -> arrow::datatypes::SchemaRef {
310        use arrow::datatypes::{DataType, Field, Schema};
311        let fields: Vec<Field> = (0..50)
312            .map(|i| Field::new(format!("col{i}"), DataType::Utf8, true))
313            .collect();
314        std::sync::Arc::new(Schema::new(fields))
315    }
316
317    #[test]
318    fn parquet_config_fixed_rows_returns_explicit_count() {
319        let pc = ParquetConfig {
320            row_group_strategy: Some(RowGroupStrategy::FixedRows),
321            row_group_rows: Some(250_000),
322            ..Default::default()
323        };
324        assert_eq!(pc.effective_row_group_rows(&narrow_schema()), Some(250_000));
325    }
326
327    #[test]
328    fn parquet_config_fixed_rows_without_row_group_rows_returns_none() {
329        let pc = ParquetConfig {
330            row_group_strategy: Some(RowGroupStrategy::FixedRows),
331            row_group_rows: None,
332            ..Default::default()
333        };
334        assert_eq!(pc.effective_row_group_rows(&narrow_schema()), None);
335    }
336
337    #[test]
338    fn parquet_config_auto_narrow_table_produces_large_groups() {
339        let pc = ParquetConfig {
340            row_group_strategy: Some(RowGroupStrategy::Auto),
341            target_row_group_mb: Some(128),
342            ..Default::default()
343        };
344        let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
345        assert!(
346            rows >= 1_000_000,
347            "narrow table should get large groups, got {rows}"
348        );
349    }
350
351    #[test]
352    fn parquet_config_auto_wide_table_produces_smaller_groups() {
353        let pc = ParquetConfig {
354            row_group_strategy: Some(RowGroupStrategy::Auto),
355            target_row_group_mb: Some(128),
356            ..Default::default()
357        };
358        let rows = pc.effective_row_group_rows(&wide_schema()).unwrap();
359        assert!(
360            rows < 100_000,
361            "wide table should get smaller groups, got {rows}"
362        );
363        assert!(rows >= 1_000, "should be at least the minimum, got {rows}");
364    }
365
366    #[test]
367    fn parquet_config_max_row_group_mb_caps_result() {
368        let pc = ParquetConfig {
369            row_group_strategy: Some(RowGroupStrategy::Auto),
370            target_row_group_mb: Some(128),
371            max_row_group_mb: Some(1),
372            ..Default::default()
373        };
374        let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
375        assert!(
376            rows <= 100_000,
377            "max_row_group_mb should cap rows, got {rows}"
378        );
379    }
380
381    #[test]
382    fn parquet_config_deserializes_from_yaml() {
383        let yaml = "row_group_strategy: auto\ntarget_row_group_mb: 64\n";
384        let pc: ParquetConfig = serde_yaml_ng::from_str(yaml).unwrap();
385        assert_eq!(pc.row_group_strategy, Some(RowGroupStrategy::Auto));
386        assert_eq!(pc.target_row_group_mb, Some(64));
387    }
388
389    #[test]
390    fn parquet_config_fixed_memory_same_math_as_auto() {
391        let auto_pc = ParquetConfig {
392            row_group_strategy: Some(RowGroupStrategy::Auto),
393            target_row_group_mb: Some(64),
394            ..Default::default()
395        };
396        let fixed_mem_pc = ParquetConfig {
397            row_group_strategy: Some(RowGroupStrategy::FixedMemory),
398            target_row_group_mb: Some(64),
399            ..Default::default()
400        };
401        assert_eq!(
402            auto_pc.effective_row_group_rows(&narrow_schema()),
403            fixed_mem_pc.effective_row_group_rows(&narrow_schema()),
404            "FixedMemory and Auto must produce identical row counts for the same target"
405        );
406        assert_eq!(
407            auto_pc.effective_row_group_rows(&wide_schema()),
408            fixed_mem_pc.effective_row_group_rows(&wide_schema()),
409        );
410    }
411
412    #[test]
413    fn parquet_config_auto_without_target_uses_default_128mb() {
414        let pc = ParquetConfig {
415            row_group_strategy: Some(RowGroupStrategy::Auto),
416            target_row_group_mb: None,
417            ..Default::default()
418        };
419        let rows = pc.effective_row_group_rows(&narrow_schema()).unwrap();
420        assert!(
421            rows >= 1_000_000,
422            "default 128 MB target should give large groups for narrow table; got {rows}"
423        );
424    }
425
426    #[test]
427    fn parquet_config_no_block_gives_none_for_row_group_rows() {
428        let pc = ParquetConfig::default();
429        let rows = pc.effective_row_group_rows(&narrow_schema());
430        assert!(
431            rows.is_some(),
432            "default ParquetConfig (strategy: None) must return Some, got None"
433        );
434    }
435
436    #[test]
437    fn parquet_config_small_target_clamps_to_minimum_1000_rows() {
438        let pc = ParquetConfig {
439            row_group_strategy: Some(RowGroupStrategy::Auto),
440            target_row_group_mb: Some(1),
441            ..Default::default()
442        };
443        let rows = pc.effective_row_group_rows(&wide_schema()).unwrap();
444        assert!(
445            rows >= 1_000,
446            "must not go below minimum 1 000 rows; got {rows}"
447        );
448    }
449}