Skip to main content

rivet/config/
export.rs

1//! Per-export configuration: query/table/mode, chunking, format, destination link.
2//!
3//! `SchemaDriftPolicy` lives here because it is only ever read via
4//! [`ExportConfig::on_schema_drift`].
5
6use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17/// What to do when structural schema drift is detected (column added, removed, or retyped).
18///
19/// ```yaml
20/// exports:
21///   - name: orders
22///     on_schema_drift: fail   # warn (default), continue, fail
23/// ```
24/// How deep `--validate` must verify each part's integrity.
25#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
26#[serde(rename_all = "snake_case")]
27pub enum VerifyMode {
28    /// Accept size-only verification when no content checksum is available.
29    #[default]
30    Size,
31    /// Require every part's content to be MD5-verified against the store's
32    /// listing; fail validation for any part that is only size-verified.
33    Content,
34}
35
36impl VerifyMode {
37    /// Whether content (not just size) verification is required.
38    pub fn requires_content(self) -> bool {
39        matches!(self, VerifyMode::Content)
40    }
41}
42
43#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
44#[serde(rename_all = "snake_case")]
45pub enum SchemaDriftPolicy {
46    /// Log a warning and continue. The new schema fingerprint is stored. (Default.)
47    #[default]
48    Warn,
49    /// Silently accept schema changes — store the new schema, no log output.
50    Continue,
51    /// Abort the run with a non-zero exit. The schema store is NOT updated so the
52    /// next run will detect the same change again.
53    Fail,
54}
55#[derive(Debug, Deserialize, JsonSchema, Clone)]
56#[serde(deny_unknown_fields)]
57pub struct ExportConfig {
58    pub name: String,
59    #[serde(default)]
60    pub query: Option<String>,
61    pub query_file: Option<String>,
62    /// Shortcut for `query: "SELECT * FROM <schema>.<table>"`.
63    ///
64    /// Accepts `table` or `schema.table` with ASCII-only identifiers
65    /// (`[A-Za-z_][A-Za-z0-9_]*`). Generates an unquoted single-table
66    /// query so the Postgres NUMERIC catalog-hint resolver recognises it
67    /// and auto-types `numeric(p,s)` columns without manual overrides.
68    ///
69    /// Mutually exclusive with `query` and `query_file`.
70    #[serde(default)]
71    pub table: Option<String>,
72    #[serde(default = "default_mode")]
73    pub mode: ExportMode,
74    /// Change-data-capture settings, required when `mode: cdc`. Reuses the
75    /// export's `table`, `destination`, and `format`; carries only the
76    /// CDC-specific knobs (resume checkpoint, per-engine stream params).
77    #[serde(default)]
78    pub cdc: Option<CdcExportConfig>,
79    pub cursor_column: Option<String>,
80    /// Secondary column for [`IncrementalCursorMode::Coalesce`] only (see ADR-0007).
81    #[serde(default)]
82    pub cursor_fallback_column: Option<String>,
83    /// How primary (and optional fallback) columns drive incremental progression.
84    #[serde(default)]
85    pub incremental_cursor_mode: IncrementalCursorMode,
86    pub chunk_column: Option<String>,
87    #[serde(default)]
88    pub chunk_dense: bool,
89    #[serde(default = "default_chunk_size")]
90    pub chunk_size: usize,
91    /// Target memory budget per chunk in MB. When set, `chunk_size` is derived
92    /// from this budget at plan-build time using a `pg_class` row-size estimate
93    /// (`pg_relation_size / reltuples`), clamped to `[10_000, 5_000_000]` rows.
94    ///
95    /// Mutually exclusive with an explicit non-default `chunk_size:`. Only
96    /// applies to `mode: chunked` on a Postgres source using the `table:`
97    /// shortcut (the row-size probe needs a known relation).
98    ///
99    /// ```yaml
100    /// exports:
101    ///   - name: page_views
102    ///     table: public.page_views
103    ///     mode: chunked
104    ///     chunk_size_memory_mb: 256
105    /// ```
106    #[serde(default)]
107    pub chunk_size_memory_mb: Option<u64>,
108    /// Divide the column range into exactly this many equal chunks.
109    /// Mutually exclusive with `chunk_dense` and `chunk_by_days`.
110    /// When set, `chunk_size` is computed dynamically from min/max.
111    pub chunk_count: Option<usize>,
112    pub chunk_by_days: Option<u32>,
113    /// Keyset (seek) pagination on this single index-backed unique key — the
114    /// source-safe shape for tables without a single-integer PK (OPT-4). The
115    /// column MUST be backed by a usable index (PK or unique); the planner
116    /// refuses a non-indexed key rather than emit a full-scan + filesort query.
117    pub chunk_by_key: Option<String>,
118    #[serde(default = "default_parallel")]
119    pub parallel: usize,
120
121    /// Advisory execution wave (1 = highest priority, run first). Written by
122    /// `rivet plan` from the source-aware prioritization score (see ADR-0006)
123    /// and consumed by `rivet apply`, which runs exports wave-by-wave in
124    /// ascending order. `None` = unscheduled (apply treats it as the last wave).
125    /// Operators may hand-edit it; a later `rivet plan` refreshes it in place.
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub wave: Option<u32>,
128
129    /// Whether this export is cheap enough to run concurrently with its
130    /// wave-mates under `rivet apply --parallel-export-processes`. Written by
131    /// `rivet plan` (true when the source-aware cost class is `Low`, i.e.
132    /// < ~100K rows); a heavier table already chunk-parallelizes internally, so
133    /// two of them at once would overload the source. `None`/`false` → the
134    /// export runs alone within its wave. Operators may hand-edit it; a later
135    /// `rivet plan` refreshes it in place.
136    #[serde(default, skip_serializing_if = "Option::is_none")]
137    pub parallel_safe: Option<bool>,
138    pub time_column: Option<String>,
139    #[serde(default = "default_time_column_type")]
140    pub time_column_type: TimeColumnType,
141    pub days_window: Option<u32>,
142
143    /// Date/time output partitioning: split this export's rows into one
144    /// destination sub-prefix per calendar bucket of this **DATE or TIMESTAMP**
145    /// column, bucketed by [`partition_granularity`](Self::partition_granularity)
146    /// (`day` / `month` / `year`), in a Hive-style `col=value/` layout
147    /// (`created_at=2023-01-01/`, `created_at=2023-01/`, `created_at=2023/`).
148    /// Requires a `{partition}` token in `destination.path` /
149    /// `destination.prefix`.
150    ///
151    /// This is **not** arbitrary value partitioning: the column's min/max is
152    /// read and parsed as a date to generate contiguous calendar buckets, so a
153    /// non-temporal column (e.g. `partition_by: status`) fails at run time with
154    /// "could not parse partition min '<value>' from column '<col>' as a date".
155    /// To split by a categorical column, write one export per value with a
156    /// `WHERE` filter instead.
157    ///
158    /// Orthogonal to `mode`: each partition runs the export's own mode, so
159    /// `mode: chunked` chunks *within* a day. Rows whose partition column is
160    /// NULL land in `col=__HIVE_DEFAULT_PARTITION__/` (Hive default partition)
161    /// so no row is silently dropped. Not compatible with `mode: time_window`.
162    ///
163    /// ```yaml
164    /// exports:
165    ///   - name: events
166    ///     table: events
167    ///     partition_by: created_at        # must be a DATE or TIMESTAMP column
168    ///     partition_granularity: day
169    ///     destination:
170    ///       type: s3
171    ///       bucket: my-bucket
172    ///       prefix: "events/{partition}/"   # → events/created_at=2023-01-01/
173    /// ```
174    #[serde(default)]
175    pub partition_by: Option<String>,
176
177    /// Calendar bucket width for [`partition_by`](Self::partition_by):
178    /// `day` (default), `month`, or `year`. Determines how the partition
179    /// column's date/timestamp range is split into contiguous Hive buckets
180    /// (`col=2023-01-01/` / `col=2023-01/` / `col=2023/`). Has no effect
181    /// unless `partition_by` is set.
182    #[serde(default)]
183    pub partition_granularity: PartitionGranularity,
184    pub format: FormatType,
185    #[serde(default)]
186    pub compression: CompressionType,
187    pub compression_level: Option<u32>,
188    pub compression_profile: Option<CompressionProfile>,
189    #[serde(default)]
190    pub skip_empty: bool,
191    pub destination: DestinationConfig,
192    /// Integrity depth required of `--validate` for this export's parts.
193    /// `size` (default) accepts size-only verification; `content` requires every
194    /// part's content MD5 to be checked against the store's listing (no
195    /// download) and **fails** validation for any part that could only be
196    /// size-verified — e.g. a part too large to upload as a single PUT (raise
197    /// `max_file_size` down so it fits), or a backend that exposes no checksum.
198    #[serde(default)]
199    pub verify: VerifyMode,
200    #[serde(default)]
201    pub meta_columns: MetaColumns,
202    #[serde(default)]
203    pub quality: Option<QualityConfig>,
204    /// Rotate to a new part when the current file reaches this size.
205    /// Accepts `B`/`KB`/`MB`/`GB` (case-insensitive) or a bare byte count;
206    /// a fractional value is allowed (`1.5GB`). Units are binary (IEC-style):
207    /// `KB` = 1024 bytes, `MB` = 1024 KB, `GB` = 1024 MB. Example: `256MB`.
208    pub max_file_size: Option<String>,
209    #[serde(default)]
210    pub chunk_checkpoint: bool,
211    pub chunk_max_attempts: Option<u32>,
212    #[serde(default)]
213    pub tuning: Option<TuningConfig>,
214    /// Optional logical group for shared source capacity (replica, host). Advisory prioritization only.
215    #[serde(default)]
216    pub source_group: Option<String>,
217    /// Hint (Epic C / ADR-0006) that this export should always be treated as reconcile-heavy
218    /// by planning, independent of the `--reconcile` CLI flag. Advisory only.
219    #[serde(default)]
220    pub reconcile_required: bool,
221
222    /// Per-column type overrides (roadmap §8). Keys are column names; values
223    /// are short type strings such as `decimal(18,2)`, `timestamp_tz`, `json`.
224    ///
225    /// ```yaml
226    /// exports:
227    ///   - name: payments
228    ///     columns:
229    ///       amount: decimal(18,2)
230    ///       fee: decimal(18,6)
231    ///       created_at: timestamp_tz
232    /// ```
233    ///
234    /// Overrides take priority over autodetection and are validated at
235    /// plan time — an invalid type string fails before the export runs.
236    #[serde(default)]
237    pub columns: std::collections::HashMap<String, String>,
238
239    /// Downstream warehouse this export targets (`bigquery` / `bq`,
240    /// `duckdb`). When set, `rivet check --type-report` resolves each column
241    /// against it (native type, honest autoload type, recovery hint) without
242    /// needing `--target` on the CLI — the CLI flag still wins when both are
243    /// present. The Parquet interchange stays target-neutral (ADR-0014 T2);
244    /// `target:` only drives guidance and the future load-schema artifact.
245    ///
246    /// ```yaml
247    /// exports:
248    ///   - name: payments
249    ///     target: bigquery
250    /// ```
251    #[serde(default)]
252    pub target: Option<String>,
253
254    /// Policy applied when structural schema drift is detected (column added, removed, or retyped).
255    /// Defaults to `warn`: log a warning and continue.
256    #[serde(default)]
257    pub on_schema_drift: SchemaDriftPolicy,
258
259    /// Growth-factor threshold for data shape drift warnings (Epic 8).
260    /// When a string/binary column's max observed byte length in the current run
261    /// exceeds `stored_max * shape_drift_warn_factor`, Rivet logs a warning.
262    /// `None` uses the default of 2.0. Set to `0.0` to disable shape tracking.
263    #[serde(default)]
264    pub shape_drift_warn_factor: Option<f64>,
265
266    /// Parquet row group tuning. Only meaningful when `format: parquet`.
267    /// When absent, the parquet library default (1,048,576 rows/group) is used.
268    #[serde(default)]
269    pub parquet: Option<ParquetConfig>,
270}
271
272impl ExportConfig {
273    /// Resolve the effective `(CompressionType, level)` for this export.
274    /// `compression_profile` takes precedence over `compression` + `compression_level`.
275    ///
276    /// L24: when a profile is set *and* a conflicting explicit codec/level was
277    /// written, warn once that the profile wins rather than silently dropping the
278    /// explicit choice. An explicit codec is only detectable when it differs from
279    /// the `#[serde(default)]` (Zstd) — a literal `compression: zstd` alongside a
280    /// profile is indistinguishable from an omitted field and stays silent.
281    pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
282        if let Some(profile) = self.compression_profile {
283            let explicit_codec =
284                (self.compression != CompressionType::default()).then_some(self.compression);
285            if let Some(msg) = super::format::compression_profile_override_warning(
286                profile,
287                explicit_codec,
288                self.compression_level,
289            ) {
290                log::warn!("export '{}': {}", self.name, msg);
291            }
292            profile.to_codec()
293        } else {
294            (self.compression, self.compression_level)
295        }
296    }
297
298    pub fn max_file_size_bytes(&self) -> Option<u64> {
299        self.max_file_size
300            .as_ref()
301            .and_then(|s| parse_file_size(s).ok())
302    }
303
304    pub fn resolve_query(
305        &self,
306        config_dir: &Path,
307        params: Option<&std::collections::HashMap<String, String>>,
308    ) -> crate::error::Result<String> {
309        // table: shortcut takes precedence — already validated by
310        // `validate_business_rules` to be mutually exclusive with query/query_file.
311        if let Some(tbl) = &self.table {
312            validate_table_shortcut_ident(&self.name, tbl)?;
313            return Ok(format!("SELECT * FROM {tbl}"));
314        }
315        match (&self.query, &self.query_file) {
316            (Some(q), None) => {
317                if params.is_some() {
318                    resolve_vars(q, params)
319                } else {
320                    Ok(q.clone())
321                }
322            }
323            (None, Some(file)) => {
324                let file_path = std::path::Path::new(file);
325                // SecOps: block absolute paths and `..` traversal components.
326                if file_path.is_absolute() {
327                    anyhow::bail!(
328                        "export '{}': query_file must be a relative path: '{}'",
329                        self.name,
330                        file
331                    );
332                }
333                if file_path
334                    .components()
335                    .any(|c| c == std::path::Component::ParentDir)
336                {
337                    anyhow::bail!(
338                        "export '{}': query_file path must not contain '..': '{}'",
339                        self.name,
340                        file
341                    );
342                }
343                let joined = config_dir.join(file);
344                // Canonicalize-based check catches symlink-based evasion for files
345                // that already exist on disk.
346                if let Ok(canonical) = joined.canonicalize() {
347                    let base = config_dir
348                        .canonicalize()
349                        .unwrap_or_else(|_| config_dir.to_path_buf());
350                    if !canonical.starts_with(&base) {
351                        anyhow::bail!(
352                            "export '{}': query_file '{}' resolves outside the config directory",
353                            self.name,
354                            file
355                        );
356                    }
357                }
358                let raw = std::fs::read_to_string(&joined)?;
359                resolve_vars(&raw, params)
360            }
361            (Some(_), Some(_)) => {
362                anyhow::bail!(
363                    "export '{}': specify either 'query' or 'query_file', not both",
364                    self.name
365                )
366            }
367            (None, None) => {
368                anyhow::bail!(
369                    "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
370                    self.name
371                )
372            }
373        }
374    }
375}
376
377/// Validate the value of the `table:` YAML shortcut.
378///
379/// Accepts ASCII identifiers in the form `<table>` or `<schema>.<table>`. Each
380/// segment must match `[A-Za-z_][A-Za-z0-9_]*`. Anything else (quoted
381/// identifiers, exotic chars, three-part names, SQL injection attempts) is
382/// rejected — the user should fall back to `query:` for those cases.
383///
384/// The bound on identifier shape keeps generated SQL safe to interpolate
385/// without quoting and ensures the generated `SELECT * FROM <ident>` form is
386/// recognised by the PG catalog-hint parser ([src/source/postgres.rs]).
387fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
388    let trimmed = raw.trim();
389    if trimmed.is_empty() {
390        anyhow::bail!("export '{export_name}': 'table' is empty");
391    }
392    let parts: Vec<&str> = trimmed.split('.').collect();
393    if parts.len() > 2 {
394        anyhow::bail!(
395            "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
396        );
397    }
398    for part in &parts {
399        if part.is_empty() {
400            anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
401        }
402        let mut chars = part.chars();
403        let first = chars.next().unwrap();
404        if !(first.is_ascii_alphabetic() || first == '_') {
405            anyhow::bail!(
406                "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
407            );
408        }
409        if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
410            anyhow::bail!(
411                "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
412            );
413        }
414    }
415    Ok(())
416}
417
418#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
419#[serde(deny_unknown_fields)]
420pub struct QualityConfig {
421    pub row_count_min: Option<usize>,
422    pub row_count_max: Option<usize>,
423    #[serde(default)]
424    pub null_ratio_max: std::collections::HashMap<String, f64>,
425    #[serde(default)]
426    pub unique_columns: Vec<String>,
427    /// Cap on the number of distinct values tracked per column during uniqueness checks.
428    /// When the limit is hit, a Warn issue is emitted and tracking stops for that column.
429    /// Prevents unbounded HashSet growth on high-cardinality columns.
430    pub unique_max_entries: Option<usize>,
431}
432
433#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
434#[serde(deny_unknown_fields)]
435pub struct MetaColumns {
436    #[serde(default)]
437    pub exported_at: bool,
438    #[serde(default)]
439    pub row_hash: bool,
440}
441
442fn default_mode() -> ExportMode {
443    ExportMode::Full
444}
445
446fn default_chunk_size() -> usize {
447    100_000
448}
449
450fn default_parallel() -> usize {
451    1
452}
453
454fn default_time_column_type() -> TimeColumnType {
455    TimeColumnType::Timestamp
456}
457
458#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
459#[serde(rename_all = "snake_case")]
460pub enum ExportMode {
461    Full,
462    Incremental,
463    Chunked,
464    TimeWindow,
465    /// Log-based change data capture (see [`CdcExportConfig`]): stream
466    /// INSERT/UPDATE/DELETE from the source's transaction log instead of querying
467    /// the table. Reuses the export's `table` / `destination` / `format`.
468    Cdc,
469}
470
471/// Per-export CDC settings, required when `mode: cdc`. The output `table`,
472/// `destination`, and `format` come from the export itself; this carries only the
473/// CDC-specific knobs (resume + per-engine stream params).
474#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
475pub struct CdcExportConfig {
476    /// Persist/resume the source log position to this file. Omit to tail from the
477    /// current position without checkpointing.
478    pub checkpoint: Option<String>,
479    /// Catch up to the source's current end and exit (a bounded run), instead of
480    /// streaming indefinitely — ideal for a scheduler. For MySQL this is a
481    /// non-blocking binlog dump; PostgreSQL / SQL Server already drain-and-exit.
482    #[serde(default)]
483    pub until_current: bool,
484    /// Stop after N change events (default: until end of stream / interrupted).
485    pub max_events: Option<usize>,
486    /// Rows per output part file (default 10000). A part also rolls at a
487    /// transaction boundary, so it never splits a transaction.
488    pub rollover: Option<usize>,
489    /// Roll a part once its buffered changes reach this many MB, whichever comes
490    /// first with `rollover`. Caps the in-memory buffer and the part file size by
491    /// bytes instead of a fixed row count — predictable for tables with wide
492    /// (large JSON / blob) rows, mirroring the batch path's `batch_size_memory_mb`.
493    pub rollover_memory_mb: Option<usize>,
494    /// MySQL replica server-id for the binlog connection (default 4271; must be
495    /// distinct from the source's and any other replica).
496    pub server_id: Option<u32>,
497    /// PostgreSQL logical replication slot name (default `rivet_slot`).
498    pub slot: Option<String>,
499    /// SQL Server CDC capture instance, e.g. `dbo_orders` — required for
500    /// `sqlserver://` sources.
501    pub capture_instance: Option<String>,
502}
503
504#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
505#[serde(rename_all = "lowercase")]
506pub enum TimeColumnType {
507    Timestamp,
508    Unix,
509}
510
511/// Calendar bucket width for date/timestamp output partitioning
512/// ([`ExportConfig::partition_by`]). The partition column must be a DATE or
513/// TIMESTAMP column; this picks how its range is split into contiguous Hive
514/// buckets. It is not a knob for partitioning by arbitrary column values.
515#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
516#[serde(rename_all = "lowercase")]
517pub enum PartitionGranularity {
518    /// One bucket per calendar day (`col=2023-01-01/`). Default.
519    #[default]
520    Day,
521    /// One bucket per calendar month (`col=2023-01/`).
522    Month,
523    /// One bucket per calendar year (`col=2023/`).
524    Year,
525}
526
527/// Canonical fully-populated [`ExportConfig`] for tests across the crate.
528///
529/// One place lists every field, so adding a field is a single-site edit (the
530/// compiler still flags this literal if a field is missing). Test call sites
531/// take this baseline and override only the fields they exercise, rather than
532/// hand-writing the full struct — see `plan::build` and `preflight` tests.
533#[cfg(test)]
534pub(crate) fn sample_export(name: &str) -> ExportConfig {
535    ExportConfig {
536        name: name.into(),
537        target: None,
538        verify: VerifyMode::Size,
539        query: Some("SELECT 1".into()),
540        query_file: None,
541        table: None,
542        mode: ExportMode::Full,
543        cdc: None,
544        cursor_column: None,
545        cursor_fallback_column: None,
546        incremental_cursor_mode: Default::default(),
547        chunk_column: None,
548        chunk_dense: false,
549        chunk_size: 100_000,
550        chunk_size_memory_mb: None,
551        chunk_count: None,
552        chunk_by_days: None,
553        chunk_by_key: None,
554        parallel: 1,
555        wave: None,
556        parallel_safe: None,
557        time_column: None,
558        time_column_type: TimeColumnType::Timestamp,
559        days_window: None,
560        partition_by: None,
561        partition_granularity: PartitionGranularity::Day,
562        format: FormatType::Parquet,
563        compression: CompressionType::None,
564        compression_level: None,
565        compression_profile: None,
566        skip_empty: false,
567        destination: crate::config::DestinationConfig {
568            destination_type: crate::config::DestinationType::Local,
569            path: Some("/tmp".into()),
570            ..Default::default()
571        },
572        meta_columns: MetaColumns::default(),
573        quality: None,
574        max_file_size: None,
575        chunk_checkpoint: false,
576        chunk_max_attempts: None,
577        tuning: None,
578        source_group: None,
579        reconcile_required: false,
580        columns: Default::default(),
581        on_schema_drift: Default::default(),
582        shape_drift_warn_factor: None,
583        parquet: None,
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590
591    // ── ExportConfig::max_file_size_bytes ───────────────────────────────────
592
593    fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
594        let yaml = format!(
595            "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n  type: local\n  path: /tmp\n{extra}"
596        );
597        serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
598    }
599
600    #[test]
601    fn max_file_size_bytes_none_when_unset() {
602        let exp = make_export_yaml("no_limit", "");
603        assert!(exp.max_file_size_bytes().is_none());
604    }
605
606    #[test]
607    fn max_file_size_bytes_parses_mb() {
608        let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
609        assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
610    }
611
612    #[test]
613    fn max_file_size_bytes_parses_gb() {
614        let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
615        assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
616    }
617
618    #[test]
619    fn max_file_size_bytes_returns_none_on_invalid() {
620        let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
621        assert!(exp.max_file_size_bytes().is_none());
622    }
623
624    // ── ExportConfig::resolve_query ─────────────────────────────────────────
625
626    // Build a minimal ExportConfig directly, bypassing Config::from_yaml validation.
627    // This lets us test the four branches inside resolve_query itself, including
628    // the (both-set / neither-set) error paths that are normally prevented by the
629    // top-level validator.
630    fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
631        ExportConfig {
632            query: query.map(|s| s.to_string()),
633            query_file: query_file.map(|s| s.to_string()),
634            ..sample_export("test")
635        }
636    }
637
638    fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
639        pairs
640            .iter()
641            .map(|(k, v)| (k.to_string(), v.to_string()))
642            .collect()
643    }
644
645    #[test]
646    fn resolve_query_inline_no_params_returns_query_as_is() {
647        let exp = make_export_direct(Some("SELECT id FROM orders"), None);
648        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
649        assert_eq!(q, "SELECT id FROM orders");
650    }
651
652    #[test]
653    fn resolve_query_inline_with_params_substitutes_vars() {
654        let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
655        let p = params(&[("col", "id"), ("table", "orders")]);
656        let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
657        assert_eq!(q, "SELECT id FROM orders");
658    }
659
660    #[test]
661    fn resolve_query_inline_params_empty_map_is_noop() {
662        let exp = make_export_direct(Some("SELECT 1"), None);
663        let p = params(&[]);
664        let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
665        assert_eq!(q, "SELECT 1");
666    }
667
668    #[test]
669    fn resolve_query_inline_missing_var_returns_error() {
670        // SAFETY: test-only; this binary is single-threaded in the test runner context.
671        unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
672        let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
673        let p = params(&[]);
674        let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
675        assert!(result.is_err());
676        let msg = format!("{:#}", result.unwrap_err());
677        assert!(
678            msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
679            "got: {msg}"
680        );
681    }
682
683    #[test]
684    fn resolve_query_file_reads_content() {
685        let dir = tempfile::TempDir::new().unwrap();
686        let sql_path = dir.path().join("query.sql");
687        std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
688        let exp = make_export_direct(None, Some("query.sql"));
689        let q = exp.resolve_query(dir.path(), None).unwrap();
690        assert_eq!(q, "SELECT * FROM customers");
691    }
692
693    #[test]
694    fn resolve_query_file_with_params_substitutes() {
695        let dir = tempfile::TempDir::new().unwrap();
696        let sql_path = dir.path().join("q.sql");
697        std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
698        let exp = make_export_direct(None, Some("q.sql"));
699        let p = params(&[("col", "name"), ("tbl", "users")]);
700        let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
701        assert_eq!(q, "SELECT name FROM users");
702    }
703
704    // ── `table:` shortcut ───────────────────────────────────────────────────
705
706    #[test]
707    fn resolve_query_table_shortcut_qualified() {
708        let mut exp = make_export_direct(None, None);
709        exp.table = Some("public.users".into());
710        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
711        assert_eq!(q, "SELECT * FROM public.users");
712    }
713
714    #[test]
715    fn resolve_query_table_shortcut_unqualified() {
716        let mut exp = make_export_direct(None, None);
717        exp.table = Some("orders".into());
718        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
719        assert_eq!(q, "SELECT * FROM orders");
720    }
721
722    #[test]
723    fn resolve_query_table_shortcut_rejects_three_part_name() {
724        let mut exp = make_export_direct(None, None);
725        exp.table = Some("db.public.users".into());
726        let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
727        let msg = format!("{err:#}");
728        assert!(msg.contains("<schema>.<name>"), "got: {msg}");
729    }
730
731    #[test]
732    fn resolve_query_table_shortcut_rejects_sql_injection() {
733        for bad in [
734            "users; DROP TABLE x",
735            "users--",
736            "users'",
737            "users\"",
738            "public.\"My Table\"",
739            "0starts_with_digit",
740            "",
741            ".trailing",
742            "leading.",
743            "two..dots",
744        ] {
745            let mut exp = make_export_direct(None, None);
746            exp.table = Some(bad.into());
747            assert!(
748                exp.resolve_query(Path::new("/tmp"), None).is_err(),
749                "should reject `table:` value '{bad}'",
750            );
751        }
752    }
753
754    #[test]
755    fn resolve_query_table_shortcut_takes_precedence_over_query() {
756        let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
757        exp.table = Some("public.y".into());
758        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
759        assert_eq!(q, "SELECT * FROM public.y");
760    }
761
762    #[test]
763    fn resolve_query_file_missing_returns_error() {
764        let dir = tempfile::TempDir::new().unwrap();
765        let exp = make_export_direct(None, Some("nonexistent.sql"));
766        let result = exp.resolve_query(dir.path(), None);
767        assert!(result.is_err());
768        let msg = format!("{:#}", result.unwrap_err());
769        assert!(
770            msg.contains("nonexistent.sql") || msg.contains("No such file"),
771            "got: {msg}"
772        );
773    }
774
775    #[test]
776    fn resolve_query_both_set_returns_error() {
777        let mut exp = make_export_direct(Some("SELECT 1"), None);
778        exp.query_file = Some("file.sql".into());
779        let result = exp.resolve_query(Path::new("/tmp"), None);
780        assert!(result.is_err());
781        let msg = format!("{:#}", result.unwrap_err());
782        assert!(
783            msg.contains("not both") || msg.contains("query_file"),
784            "got: {msg}"
785        );
786    }
787
788    #[test]
789    fn resolve_query_neither_set_returns_error() {
790        let exp = make_export_direct(None, None);
791        let result = exp.resolve_query(Path::new("/tmp"), None);
792        assert!(result.is_err());
793        let msg = format!("{:#}", result.unwrap_err());
794        assert!(
795            msg.contains("query") || msg.contains("query_file"),
796            "got: {msg}"
797        );
798    }
799
800    // ── SecOps: query_file path traversal prevention ──────────────────────────
801
802    #[test]
803    fn resolve_query_file_dotdot_is_rejected() {
804        let dir = tempfile::TempDir::new().unwrap();
805        let exp = make_export_direct(None, Some("../secret.sql"));
806        let result = exp.resolve_query(dir.path(), None);
807        assert!(result.is_err());
808        let msg = format!("{:#}", result.unwrap_err());
809        assert!(
810            msg.contains("..") || msg.contains("traversal"),
811            "got: {msg}"
812        );
813    }
814
815    #[test]
816    fn resolve_query_file_nested_dotdot_is_rejected() {
817        let dir = tempfile::TempDir::new().unwrap();
818        let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
819        let result = exp.resolve_query(dir.path(), None);
820        assert!(result.is_err());
821        let msg = format!("{:#}", result.unwrap_err());
822        assert!(
823            msg.contains("..") || msg.contains("traversal"),
824            "got: {msg}"
825        );
826    }
827
828    #[test]
829    fn resolve_query_file_absolute_path_is_rejected() {
830        let dir = tempfile::TempDir::new().unwrap();
831        let exp = make_export_direct(None, Some("/etc/passwd"));
832        let result = exp.resolve_query(dir.path(), None);
833        assert!(result.is_err());
834        let msg = format!("{:#}", result.unwrap_err());
835        assert!(
836            msg.contains("relative") || msg.contains("absolute"),
837            "got: {msg}"
838        );
839    }
840
841    #[test]
842    fn resolve_query_file_in_subdir_is_allowed() {
843        let dir = tempfile::TempDir::new().unwrap();
844        let subdir = dir.path().join("queries");
845        std::fs::create_dir(&subdir).unwrap();
846        std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
847        let exp = make_export_direct(None, Some("queries/orders.sql"));
848        let q = exp.resolve_query(dir.path(), None).unwrap();
849        assert_eq!(q, "SELECT * FROM orders");
850    }
851}