Skip to main content

rivet/config/
export.rs

1//! Per-export configuration: query/table/mode, chunking, format, destination link.
2//!
3//! `SchemaDriftPolicy` lives here because it is only ever read via
4//! [`ExportConfig::on_schema_drift`].
5
6use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17/// What to do when structural schema drift is detected (column added, removed, or retyped).
18///
19/// ```yaml
20/// exports:
21///   - name: orders
22///     on_schema_drift: fail   # warn (default), continue, fail
23/// ```
24/// How deep `--validate` must verify each part's integrity.
25#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
26#[serde(rename_all = "snake_case")]
27pub enum VerifyMode {
28    /// Accept size-only verification when no content checksum is available.
29    #[default]
30    Size,
31    /// Require every part's content to be MD5-verified against the store's
32    /// listing; fail validation for any part that is only size-verified.
33    Content,
34}
35
36impl VerifyMode {
37    /// Whether content (not just size) verification is required.
38    pub fn requires_content(self) -> bool {
39        matches!(self, VerifyMode::Content)
40    }
41}
42
43#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
44#[serde(rename_all = "snake_case")]
45pub enum SchemaDriftPolicy {
46    /// Log a warning and continue. The new schema fingerprint is stored. (Default.)
47    #[default]
48    Warn,
49    /// Silently accept schema changes — store the new schema, no log output.
50    Continue,
51    /// Abort the run with a non-zero exit. The schema store is NOT updated so the
52    /// next run will detect the same change again.
53    Fail,
54}
55#[derive(Debug, Deserialize, JsonSchema, Clone)]
56#[serde(deny_unknown_fields)]
57pub struct ExportConfig {
58    pub name: String,
59    #[serde(default)]
60    pub query: Option<String>,
61    pub query_file: Option<String>,
62    /// Shortcut for `query: "SELECT * FROM <schema>.<table>"`.
63    ///
64    /// Accepts `table` or `schema.table` with ASCII-only identifiers
65    /// (`[A-Za-z_][A-Za-z0-9_]*`). Generates an unquoted single-table
66    /// query so the Postgres NUMERIC catalog-hint resolver recognises it
67    /// and auto-types `numeric(p,s)` columns without manual overrides.
68    ///
69    /// Mutually exclusive with `query` and `query_file`.
70    #[serde(default)]
71    pub table: Option<String>,
72    #[serde(default = "default_mode")]
73    pub mode: ExportMode,
74    /// Change-data-capture settings, required when `mode: cdc`. Reuses the
75    /// export's `table`, `destination`, and `format`; carries only the
76    /// CDC-specific knobs (resume checkpoint, per-engine stream params).
77    #[serde(default)]
78    pub cdc: Option<CdcExportConfig>,
79    pub cursor_column: Option<String>,
80    /// Secondary column for [`IncrementalCursorMode::Coalesce`] only (see ADR-0007).
81    #[serde(default)]
82    pub cursor_fallback_column: Option<String>,
83    /// How primary (and optional fallback) columns drive incremental progression.
84    #[serde(default)]
85    pub incremental_cursor_mode: IncrementalCursorMode,
86    pub chunk_column: Option<String>,
87    #[serde(default)]
88    pub chunk_dense: bool,
89    #[serde(default = "default_chunk_size")]
90    pub chunk_size: usize,
91    /// Target memory budget per chunk in MB. When set, `chunk_size` is derived
92    /// from this budget at plan-build time using a `pg_class` row-size estimate
93    /// (`pg_relation_size / reltuples`), clamped to `[10_000, 5_000_000]` rows.
94    ///
95    /// Mutually exclusive with an explicit non-default `chunk_size:`. Only
96    /// applies to `mode: chunked` on a Postgres source using the `table:`
97    /// shortcut (the row-size probe needs a known relation).
98    ///
99    /// ```yaml
100    /// exports:
101    ///   - name: page_views
102    ///     table: public.page_views
103    ///     mode: chunked
104    ///     chunk_size_memory_mb: 256
105    /// ```
106    #[serde(default)]
107    pub chunk_size_memory_mb: Option<u64>,
108    /// Divide the column range into exactly this many equal chunks.
109    /// Mutually exclusive with `chunk_dense` and `chunk_by_days`.
110    /// When set, `chunk_size` is computed dynamically from min/max.
111    pub chunk_count: Option<usize>,
112    pub chunk_by_days: Option<u32>,
113    /// Keyset (seek) pagination on this single index-backed unique key — the
114    /// source-safe shape for tables without a single-integer PK (OPT-4). The
115    /// column MUST be backed by a usable index (PK or unique); the planner
116    /// refuses a non-indexed key rather than emit a full-scan + filesort query.
117    pub chunk_by_key: Option<String>,
118    #[serde(default = "default_parallel")]
119    pub parallel: usize,
120    pub time_column: Option<String>,
121    #[serde(default = "default_time_column_type")]
122    pub time_column_type: TimeColumnType,
123    pub days_window: Option<u32>,
124
125    /// Date/time output partitioning: split this export's rows into one
126    /// destination sub-prefix per calendar bucket of this **DATE or TIMESTAMP**
127    /// column, bucketed by [`partition_granularity`](Self::partition_granularity)
128    /// (`day` / `month` / `year`), in a Hive-style `col=value/` layout
129    /// (`created_at=2023-01-01/`, `created_at=2023-01/`, `created_at=2023/`).
130    /// Requires a `{partition}` token in `destination.path` /
131    /// `destination.prefix`.
132    ///
133    /// This is **not** arbitrary value partitioning: the column's min/max is
134    /// read and parsed as a date to generate contiguous calendar buckets, so a
135    /// non-temporal column (e.g. `partition_by: status`) fails at run time with
136    /// "could not parse partition min '<value>' from column '<col>' as a date".
137    /// To split by a categorical column, write one export per value with a
138    /// `WHERE` filter instead.
139    ///
140    /// Orthogonal to `mode`: each partition runs the export's own mode, so
141    /// `mode: chunked` chunks *within* a day. Rows whose partition column is
142    /// NULL land in `col=__HIVE_DEFAULT_PARTITION__/` (Hive default partition)
143    /// so no row is silently dropped. Not compatible with `mode: time_window`.
144    ///
145    /// ```yaml
146    /// exports:
147    ///   - name: events
148    ///     table: events
149    ///     partition_by: created_at        # must be a DATE or TIMESTAMP column
150    ///     partition_granularity: day
151    ///     destination:
152    ///       type: s3
153    ///       bucket: my-bucket
154    ///       prefix: "events/{partition}/"   # → events/created_at=2023-01-01/
155    /// ```
156    #[serde(default)]
157    pub partition_by: Option<String>,
158
159    /// Calendar bucket width for [`partition_by`](Self::partition_by):
160    /// `day` (default), `month`, or `year`. Determines how the partition
161    /// column's date/timestamp range is split into contiguous Hive buckets
162    /// (`col=2023-01-01/` / `col=2023-01/` / `col=2023/`). Has no effect
163    /// unless `partition_by` is set.
164    #[serde(default)]
165    pub partition_granularity: PartitionGranularity,
166    pub format: FormatType,
167    #[serde(default)]
168    pub compression: CompressionType,
169    pub compression_level: Option<u32>,
170    pub compression_profile: Option<CompressionProfile>,
171    #[serde(default)]
172    pub skip_empty: bool,
173    pub destination: DestinationConfig,
174    /// Integrity depth required of `--validate` for this export's parts.
175    /// `size` (default) accepts size-only verification; `content` requires every
176    /// part's content MD5 to be checked against the store's listing (no
177    /// download) and **fails** validation for any part that could only be
178    /// size-verified — e.g. a part too large to upload as a single PUT (raise
179    /// `max_file_size` down so it fits), or a backend that exposes no checksum.
180    #[serde(default)]
181    pub verify: VerifyMode,
182    #[serde(default)]
183    pub meta_columns: MetaColumns,
184    #[serde(default)]
185    pub quality: Option<QualityConfig>,
186    /// Rotate to a new part when the current file reaches this size.
187    /// Accepts `B`/`KB`/`MB`/`GB` (case-insensitive) or a bare byte count;
188    /// a fractional value is allowed (`1.5GB`). Units are binary (IEC-style):
189    /// `KB` = 1024 bytes, `MB` = 1024 KB, `GB` = 1024 MB. Example: `256MB`.
190    pub max_file_size: Option<String>,
191    #[serde(default)]
192    pub chunk_checkpoint: bool,
193    pub chunk_max_attempts: Option<u32>,
194    #[serde(default)]
195    pub tuning: Option<TuningConfig>,
196    /// Optional logical group for shared source capacity (replica, host). Advisory prioritization only.
197    #[serde(default)]
198    pub source_group: Option<String>,
199    /// Hint (Epic C / ADR-0006) that this export should always be treated as reconcile-heavy
200    /// by planning, independent of the `--reconcile` CLI flag. Advisory only.
201    #[serde(default)]
202    pub reconcile_required: bool,
203
204    /// Per-column type overrides (roadmap §8). Keys are column names; values
205    /// are short type strings such as `decimal(18,2)`, `timestamp_tz`, `json`.
206    ///
207    /// ```yaml
208    /// exports:
209    ///   - name: payments
210    ///     columns:
211    ///       amount: decimal(18,2)
212    ///       fee: decimal(18,6)
213    ///       created_at: timestamp_tz
214    /// ```
215    ///
216    /// Overrides take priority over autodetection and are validated at
217    /// plan time — an invalid type string fails before the export runs.
218    #[serde(default)]
219    pub columns: std::collections::HashMap<String, String>,
220
221    /// Downstream warehouse this export targets (`bigquery` / `bq`,
222    /// `duckdb`). When set, `rivet check --type-report` resolves each column
223    /// against it (native type, honest autoload type, recovery hint) without
224    /// needing `--target` on the CLI — the CLI flag still wins when both are
225    /// present. The Parquet interchange stays target-neutral (ADR-0014 T2);
226    /// `target:` only drives guidance and the future load-schema artifact.
227    ///
228    /// ```yaml
229    /// exports:
230    ///   - name: payments
231    ///     target: bigquery
232    /// ```
233    #[serde(default)]
234    pub target: Option<String>,
235
236    /// Policy applied when structural schema drift is detected (column added, removed, or retyped).
237    /// Defaults to `warn`: log a warning and continue.
238    #[serde(default)]
239    pub on_schema_drift: SchemaDriftPolicy,
240
241    /// Growth-factor threshold for data shape drift warnings (Epic 8).
242    /// When a string/binary column's max observed byte length in the current run
243    /// exceeds `stored_max * shape_drift_warn_factor`, Rivet logs a warning.
244    /// `None` uses the default of 2.0. Set to `0.0` to disable shape tracking.
245    #[serde(default)]
246    pub shape_drift_warn_factor: Option<f64>,
247
248    /// Parquet row group tuning. Only meaningful when `format: parquet`.
249    /// When absent, the parquet library default (1,048,576 rows/group) is used.
250    #[serde(default)]
251    pub parquet: Option<ParquetConfig>,
252}
253
254impl ExportConfig {
255    /// Resolve the effective `(CompressionType, level)` for this export.
256    /// `compression_profile` takes precedence over `compression` + `compression_level`.
257    ///
258    /// L24: when a profile is set *and* a conflicting explicit codec/level was
259    /// written, warn once that the profile wins rather than silently dropping the
260    /// explicit choice. An explicit codec is only detectable when it differs from
261    /// the `#[serde(default)]` (Zstd) — a literal `compression: zstd` alongside a
262    /// profile is indistinguishable from an omitted field and stays silent.
263    pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
264        if let Some(profile) = self.compression_profile {
265            let explicit_codec =
266                (self.compression != CompressionType::default()).then_some(self.compression);
267            if let Some(msg) = super::format::compression_profile_override_warning(
268                profile,
269                explicit_codec,
270                self.compression_level,
271            ) {
272                log::warn!("export '{}': {}", self.name, msg);
273            }
274            profile.to_codec()
275        } else {
276            (self.compression, self.compression_level)
277        }
278    }
279
280    pub fn max_file_size_bytes(&self) -> Option<u64> {
281        self.max_file_size
282            .as_ref()
283            .and_then(|s| parse_file_size(s).ok())
284    }
285
286    pub fn resolve_query(
287        &self,
288        config_dir: &Path,
289        params: Option<&std::collections::HashMap<String, String>>,
290    ) -> crate::error::Result<String> {
291        // table: shortcut takes precedence — already validated by
292        // `validate_business_rules` to be mutually exclusive with query/query_file.
293        if let Some(tbl) = &self.table {
294            validate_table_shortcut_ident(&self.name, tbl)?;
295            return Ok(format!("SELECT * FROM {tbl}"));
296        }
297        match (&self.query, &self.query_file) {
298            (Some(q), None) => {
299                if params.is_some() {
300                    resolve_vars(q, params)
301                } else {
302                    Ok(q.clone())
303                }
304            }
305            (None, Some(file)) => {
306                let file_path = std::path::Path::new(file);
307                // SecOps: block absolute paths and `..` traversal components.
308                if file_path.is_absolute() {
309                    anyhow::bail!(
310                        "export '{}': query_file must be a relative path: '{}'",
311                        self.name,
312                        file
313                    );
314                }
315                if file_path
316                    .components()
317                    .any(|c| c == std::path::Component::ParentDir)
318                {
319                    anyhow::bail!(
320                        "export '{}': query_file path must not contain '..': '{}'",
321                        self.name,
322                        file
323                    );
324                }
325                let joined = config_dir.join(file);
326                // Canonicalize-based check catches symlink-based evasion for files
327                // that already exist on disk.
328                if let Ok(canonical) = joined.canonicalize() {
329                    let base = config_dir
330                        .canonicalize()
331                        .unwrap_or_else(|_| config_dir.to_path_buf());
332                    if !canonical.starts_with(&base) {
333                        anyhow::bail!(
334                            "export '{}': query_file '{}' resolves outside the config directory",
335                            self.name,
336                            file
337                        );
338                    }
339                }
340                let raw = std::fs::read_to_string(&joined)?;
341                resolve_vars(&raw, params)
342            }
343            (Some(_), Some(_)) => {
344                anyhow::bail!(
345                    "export '{}': specify either 'query' or 'query_file', not both",
346                    self.name
347                )
348            }
349            (None, None) => {
350                anyhow::bail!(
351                    "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
352                    self.name
353                )
354            }
355        }
356    }
357}
358
359/// Validate the value of the `table:` YAML shortcut.
360///
361/// Accepts ASCII identifiers in the form `<table>` or `<schema>.<table>`. Each
362/// segment must match `[A-Za-z_][A-Za-z0-9_]*`. Anything else (quoted
363/// identifiers, exotic chars, three-part names, SQL injection attempts) is
364/// rejected — the user should fall back to `query:` for those cases.
365///
366/// The bound on identifier shape keeps generated SQL safe to interpolate
367/// without quoting and ensures the generated `SELECT * FROM <ident>` form is
368/// recognised by the PG catalog-hint parser ([src/source/postgres.rs]).
369fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
370    let trimmed = raw.trim();
371    if trimmed.is_empty() {
372        anyhow::bail!("export '{export_name}': 'table' is empty");
373    }
374    let parts: Vec<&str> = trimmed.split('.').collect();
375    if parts.len() > 2 {
376        anyhow::bail!(
377            "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
378        );
379    }
380    for part in &parts {
381        if part.is_empty() {
382            anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
383        }
384        let mut chars = part.chars();
385        let first = chars.next().unwrap();
386        if !(first.is_ascii_alphabetic() || first == '_') {
387            anyhow::bail!(
388                "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
389            );
390        }
391        if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
392            anyhow::bail!(
393                "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
394            );
395        }
396    }
397    Ok(())
398}
399
400#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
401#[serde(deny_unknown_fields)]
402pub struct QualityConfig {
403    pub row_count_min: Option<usize>,
404    pub row_count_max: Option<usize>,
405    #[serde(default)]
406    pub null_ratio_max: std::collections::HashMap<String, f64>,
407    #[serde(default)]
408    pub unique_columns: Vec<String>,
409    /// Cap on the number of distinct values tracked per column during uniqueness checks.
410    /// When the limit is hit, a Warn issue is emitted and tracking stops for that column.
411    /// Prevents unbounded HashSet growth on high-cardinality columns.
412    pub unique_max_entries: Option<usize>,
413}
414
415#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
416#[serde(deny_unknown_fields)]
417pub struct MetaColumns {
418    #[serde(default)]
419    pub exported_at: bool,
420    #[serde(default)]
421    pub row_hash: bool,
422}
423
424fn default_mode() -> ExportMode {
425    ExportMode::Full
426}
427
428fn default_chunk_size() -> usize {
429    100_000
430}
431
432fn default_parallel() -> usize {
433    1
434}
435
436fn default_time_column_type() -> TimeColumnType {
437    TimeColumnType::Timestamp
438}
439
440#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
441#[serde(rename_all = "snake_case")]
442pub enum ExportMode {
443    Full,
444    Incremental,
445    Chunked,
446    TimeWindow,
447    /// Log-based change data capture (see [`CdcExportConfig`]): stream
448    /// INSERT/UPDATE/DELETE from the source's transaction log instead of querying
449    /// the table. Reuses the export's `table` / `destination` / `format`.
450    Cdc,
451}
452
453/// Per-export CDC settings, required when `mode: cdc`. The output `table`,
454/// `destination`, and `format` come from the export itself; this carries only the
455/// CDC-specific knobs (resume + per-engine stream params).
456#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
457pub struct CdcExportConfig {
458    /// Persist/resume the source log position to this file. Omit to tail from the
459    /// current position without checkpointing.
460    pub checkpoint: Option<String>,
461    /// Catch up to the source's current end and exit (a bounded run), instead of
462    /// streaming indefinitely — ideal for a scheduler. For MySQL this is a
463    /// non-blocking binlog dump; PostgreSQL / SQL Server already drain-and-exit.
464    #[serde(default)]
465    pub until_current: bool,
466    /// Stop after N change events (default: until end of stream / interrupted).
467    pub max_events: Option<usize>,
468    /// Rows per output part file (default 10000). A part also rolls at a
469    /// transaction boundary, so it never splits a transaction.
470    pub rollover: Option<usize>,
471    /// Roll a part once its buffered changes reach this many MB, whichever comes
472    /// first with `rollover`. Caps the in-memory buffer and the part file size by
473    /// bytes instead of a fixed row count — predictable for tables with wide
474    /// (large JSON / blob) rows, mirroring the batch path's `batch_size_memory_mb`.
475    pub rollover_memory_mb: Option<usize>,
476    /// MySQL replica server-id for the binlog connection (default 4271; must be
477    /// distinct from the source's and any other replica).
478    pub server_id: Option<u32>,
479    /// PostgreSQL logical replication slot name (default `rivet_slot`).
480    pub slot: Option<String>,
481    /// SQL Server CDC capture instance, e.g. `dbo_orders` — required for
482    /// `sqlserver://` sources.
483    pub capture_instance: Option<String>,
484}
485
486#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
487#[serde(rename_all = "lowercase")]
488pub enum TimeColumnType {
489    Timestamp,
490    Unix,
491}
492
493/// Calendar bucket width for date/timestamp output partitioning
494/// ([`ExportConfig::partition_by`]). The partition column must be a DATE or
495/// TIMESTAMP column; this picks how its range is split into contiguous Hive
496/// buckets. It is not a knob for partitioning by arbitrary column values.
497#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
498#[serde(rename_all = "lowercase")]
499pub enum PartitionGranularity {
500    /// One bucket per calendar day (`col=2023-01-01/`). Default.
501    #[default]
502    Day,
503    /// One bucket per calendar month (`col=2023-01/`).
504    Month,
505    /// One bucket per calendar year (`col=2023/`).
506    Year,
507}
508
509/// Canonical fully-populated [`ExportConfig`] for tests across the crate.
510///
511/// One place lists every field, so adding a field is a single-site edit (the
512/// compiler still flags this literal if a field is missing). Test call sites
513/// take this baseline and override only the fields they exercise, rather than
514/// hand-writing the full struct — see `plan::build` and `preflight` tests.
515#[cfg(test)]
516pub(crate) fn sample_export(name: &str) -> ExportConfig {
517    ExportConfig {
518        name: name.into(),
519        target: None,
520        verify: VerifyMode::Size,
521        query: Some("SELECT 1".into()),
522        query_file: None,
523        table: None,
524        mode: ExportMode::Full,
525        cdc: None,
526        cursor_column: None,
527        cursor_fallback_column: None,
528        incremental_cursor_mode: Default::default(),
529        chunk_column: None,
530        chunk_dense: false,
531        chunk_size: 100_000,
532        chunk_size_memory_mb: None,
533        chunk_count: None,
534        chunk_by_days: None,
535        chunk_by_key: None,
536        parallel: 1,
537        time_column: None,
538        time_column_type: TimeColumnType::Timestamp,
539        days_window: None,
540        partition_by: None,
541        partition_granularity: PartitionGranularity::Day,
542        format: FormatType::Parquet,
543        compression: CompressionType::None,
544        compression_level: None,
545        compression_profile: None,
546        skip_empty: false,
547        destination: crate::config::DestinationConfig {
548            destination_type: crate::config::DestinationType::Local,
549            path: Some("/tmp".into()),
550            ..Default::default()
551        },
552        meta_columns: MetaColumns::default(),
553        quality: None,
554        max_file_size: None,
555        chunk_checkpoint: false,
556        chunk_max_attempts: None,
557        tuning: None,
558        source_group: None,
559        reconcile_required: false,
560        columns: Default::default(),
561        on_schema_drift: Default::default(),
562        shape_drift_warn_factor: None,
563        parquet: None,
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use super::*;
570
571    // ── ExportConfig::max_file_size_bytes ───────────────────────────────────
572
573    fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
574        let yaml = format!(
575            "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n  type: local\n  path: /tmp\n{extra}"
576        );
577        serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
578    }
579
580    #[test]
581    fn max_file_size_bytes_none_when_unset() {
582        let exp = make_export_yaml("no_limit", "");
583        assert!(exp.max_file_size_bytes().is_none());
584    }
585
586    #[test]
587    fn max_file_size_bytes_parses_mb() {
588        let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
589        assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
590    }
591
592    #[test]
593    fn max_file_size_bytes_parses_gb() {
594        let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
595        assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
596    }
597
598    #[test]
599    fn max_file_size_bytes_returns_none_on_invalid() {
600        let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
601        assert!(exp.max_file_size_bytes().is_none());
602    }
603
604    // ── ExportConfig::resolve_query ─────────────────────────────────────────
605
606    // Build a minimal ExportConfig directly, bypassing Config::from_yaml validation.
607    // This lets us test the four branches inside resolve_query itself, including
608    // the (both-set / neither-set) error paths that are normally prevented by the
609    // top-level validator.
610    fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
611        ExportConfig {
612            query: query.map(|s| s.to_string()),
613            query_file: query_file.map(|s| s.to_string()),
614            ..sample_export("test")
615        }
616    }
617
618    fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
619        pairs
620            .iter()
621            .map(|(k, v)| (k.to_string(), v.to_string()))
622            .collect()
623    }
624
625    #[test]
626    fn resolve_query_inline_no_params_returns_query_as_is() {
627        let exp = make_export_direct(Some("SELECT id FROM orders"), None);
628        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
629        assert_eq!(q, "SELECT id FROM orders");
630    }
631
632    #[test]
633    fn resolve_query_inline_with_params_substitutes_vars() {
634        let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
635        let p = params(&[("col", "id"), ("table", "orders")]);
636        let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
637        assert_eq!(q, "SELECT id FROM orders");
638    }
639
640    #[test]
641    fn resolve_query_inline_params_empty_map_is_noop() {
642        let exp = make_export_direct(Some("SELECT 1"), None);
643        let p = params(&[]);
644        let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
645        assert_eq!(q, "SELECT 1");
646    }
647
648    #[test]
649    fn resolve_query_inline_missing_var_returns_error() {
650        // SAFETY: test-only; this binary is single-threaded in the test runner context.
651        unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
652        let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
653        let p = params(&[]);
654        let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
655        assert!(result.is_err());
656        let msg = format!("{:#}", result.unwrap_err());
657        assert!(
658            msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
659            "got: {msg}"
660        );
661    }
662
663    #[test]
664    fn resolve_query_file_reads_content() {
665        let dir = tempfile::TempDir::new().unwrap();
666        let sql_path = dir.path().join("query.sql");
667        std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
668        let exp = make_export_direct(None, Some("query.sql"));
669        let q = exp.resolve_query(dir.path(), None).unwrap();
670        assert_eq!(q, "SELECT * FROM customers");
671    }
672
673    #[test]
674    fn resolve_query_file_with_params_substitutes() {
675        let dir = tempfile::TempDir::new().unwrap();
676        let sql_path = dir.path().join("q.sql");
677        std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
678        let exp = make_export_direct(None, Some("q.sql"));
679        let p = params(&[("col", "name"), ("tbl", "users")]);
680        let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
681        assert_eq!(q, "SELECT name FROM users");
682    }
683
684    // ── `table:` shortcut ───────────────────────────────────────────────────
685
686    #[test]
687    fn resolve_query_table_shortcut_qualified() {
688        let mut exp = make_export_direct(None, None);
689        exp.table = Some("public.users".into());
690        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
691        assert_eq!(q, "SELECT * FROM public.users");
692    }
693
694    #[test]
695    fn resolve_query_table_shortcut_unqualified() {
696        let mut exp = make_export_direct(None, None);
697        exp.table = Some("orders".into());
698        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
699        assert_eq!(q, "SELECT * FROM orders");
700    }
701
702    #[test]
703    fn resolve_query_table_shortcut_rejects_three_part_name() {
704        let mut exp = make_export_direct(None, None);
705        exp.table = Some("db.public.users".into());
706        let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
707        let msg = format!("{err:#}");
708        assert!(msg.contains("<schema>.<name>"), "got: {msg}");
709    }
710
711    #[test]
712    fn resolve_query_table_shortcut_rejects_sql_injection() {
713        for bad in [
714            "users; DROP TABLE x",
715            "users--",
716            "users'",
717            "users\"",
718            "public.\"My Table\"",
719            "0starts_with_digit",
720            "",
721            ".trailing",
722            "leading.",
723            "two..dots",
724        ] {
725            let mut exp = make_export_direct(None, None);
726            exp.table = Some(bad.into());
727            assert!(
728                exp.resolve_query(Path::new("/tmp"), None).is_err(),
729                "should reject `table:` value '{bad}'",
730            );
731        }
732    }
733
734    #[test]
735    fn resolve_query_table_shortcut_takes_precedence_over_query() {
736        let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
737        exp.table = Some("public.y".into());
738        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
739        assert_eq!(q, "SELECT * FROM public.y");
740    }
741
742    #[test]
743    fn resolve_query_file_missing_returns_error() {
744        let dir = tempfile::TempDir::new().unwrap();
745        let exp = make_export_direct(None, Some("nonexistent.sql"));
746        let result = exp.resolve_query(dir.path(), None);
747        assert!(result.is_err());
748        let msg = format!("{:#}", result.unwrap_err());
749        assert!(
750            msg.contains("nonexistent.sql") || msg.contains("No such file"),
751            "got: {msg}"
752        );
753    }
754
755    #[test]
756    fn resolve_query_both_set_returns_error() {
757        let mut exp = make_export_direct(Some("SELECT 1"), None);
758        exp.query_file = Some("file.sql".into());
759        let result = exp.resolve_query(Path::new("/tmp"), None);
760        assert!(result.is_err());
761        let msg = format!("{:#}", result.unwrap_err());
762        assert!(
763            msg.contains("not both") || msg.contains("query_file"),
764            "got: {msg}"
765        );
766    }
767
768    #[test]
769    fn resolve_query_neither_set_returns_error() {
770        let exp = make_export_direct(None, None);
771        let result = exp.resolve_query(Path::new("/tmp"), None);
772        assert!(result.is_err());
773        let msg = format!("{:#}", result.unwrap_err());
774        assert!(
775            msg.contains("query") || msg.contains("query_file"),
776            "got: {msg}"
777        );
778    }
779
780    // ── SecOps: query_file path traversal prevention ──────────────────────────
781
782    #[test]
783    fn resolve_query_file_dotdot_is_rejected() {
784        let dir = tempfile::TempDir::new().unwrap();
785        let exp = make_export_direct(None, Some("../secret.sql"));
786        let result = exp.resolve_query(dir.path(), None);
787        assert!(result.is_err());
788        let msg = format!("{:#}", result.unwrap_err());
789        assert!(
790            msg.contains("..") || msg.contains("traversal"),
791            "got: {msg}"
792        );
793    }
794
795    #[test]
796    fn resolve_query_file_nested_dotdot_is_rejected() {
797        let dir = tempfile::TempDir::new().unwrap();
798        let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
799        let result = exp.resolve_query(dir.path(), None);
800        assert!(result.is_err());
801        let msg = format!("{:#}", result.unwrap_err());
802        assert!(
803            msg.contains("..") || msg.contains("traversal"),
804            "got: {msg}"
805        );
806    }
807
808    #[test]
809    fn resolve_query_file_absolute_path_is_rejected() {
810        let dir = tempfile::TempDir::new().unwrap();
811        let exp = make_export_direct(None, Some("/etc/passwd"));
812        let result = exp.resolve_query(dir.path(), None);
813        assert!(result.is_err());
814        let msg = format!("{:#}", result.unwrap_err());
815        assert!(
816            msg.contains("relative") || msg.contains("absolute"),
817            "got: {msg}"
818        );
819    }
820
821    #[test]
822    fn resolve_query_file_in_subdir_is_allowed() {
823        let dir = tempfile::TempDir::new().unwrap();
824        let subdir = dir.path().join("queries");
825        std::fs::create_dir(&subdir).unwrap();
826        std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
827        let exp = make_export_direct(None, Some("queries/orders.sql"));
828        let q = exp.resolve_query(dir.path(), None).unwrap();
829        assert_eq!(q, "SELECT * FROM orders");
830    }
831}