Skip to main content

rivet/config/
export.rs

1//! Per-export configuration: query/table/mode, chunking, format, destination link.
2//!
3//! `SchemaDriftPolicy` lives here because it is only ever read via
4//! [`ExportConfig::on_schema_drift`].
5
6use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17/// What to do when structural schema drift is detected (column added, removed, or retyped).
18///
19/// ```yaml
20/// exports:
21///   - name: orders
22///     on_schema_drift: fail   # warn (default), continue, fail
23/// ```
24/// How deep `--validate` must verify each part's integrity.
25#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
26#[serde(rename_all = "snake_case")]
27pub enum VerifyMode {
28    /// Accept size-only verification when no content checksum is available.
29    #[default]
30    Size,
31    /// Require every part's content to be MD5-verified against the store's
32    /// listing; fail validation for any part that is only size-verified.
33    Content,
34}
35
36impl VerifyMode {
37    /// Whether content (not just size) verification is required.
38    pub fn requires_content(self) -> bool {
39        matches!(self, VerifyMode::Content)
40    }
41}
42
43#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
44#[serde(rename_all = "snake_case")]
45pub enum SchemaDriftPolicy {
46    /// Log a warning and continue. The new schema fingerprint is stored. (Default.)
47    #[default]
48    Warn,
49    /// Silently accept schema changes — store the new schema, no log output.
50    Continue,
51    /// Abort the run with a non-zero exit. The schema store is NOT updated so the
52    /// next run will detect the same change again.
53    Fail,
54}
55#[derive(Debug, Deserialize, JsonSchema, Clone)]
56#[serde(deny_unknown_fields)]
57pub struct ExportConfig {
58    pub name: String,
59    #[serde(default)]
60    pub query: Option<String>,
61    pub query_file: Option<String>,
62    /// Shortcut for `query: "SELECT * FROM <schema>.<table>"`.
63    ///
64    /// Accepts `table` or `schema.table` with ASCII-only identifiers
65    /// (`[A-Za-z_][A-Za-z0-9_]*`). Generates an unquoted single-table
66    /// query so the Postgres NUMERIC catalog-hint resolver recognises it
67    /// and auto-types `numeric(p,s)` columns without manual overrides.
68    ///
69    /// Mutually exclusive with `query` and `query_file`.
70    #[serde(default)]
71    pub table: Option<String>,
72    #[serde(default = "default_mode")]
73    pub mode: ExportMode,
74    pub cursor_column: Option<String>,
75    /// Secondary column for [`IncrementalCursorMode::Coalesce`] only (see ADR-0007).
76    #[serde(default)]
77    pub cursor_fallback_column: Option<String>,
78    /// How primary (and optional fallback) columns drive incremental progression.
79    #[serde(default)]
80    pub incremental_cursor_mode: IncrementalCursorMode,
81    pub chunk_column: Option<String>,
82    #[serde(default)]
83    pub chunk_dense: bool,
84    #[serde(default = "default_chunk_size")]
85    pub chunk_size: usize,
86    /// Target memory budget per chunk in MB. When set, `chunk_size` is derived
87    /// from this budget at plan-build time using a `pg_class` row-size estimate
88    /// (`pg_relation_size / reltuples`), clamped to `[10_000, 5_000_000]` rows.
89    ///
90    /// Mutually exclusive with an explicit non-default `chunk_size:`. Only
91    /// applies to `mode: chunked` on a Postgres source using the `table:`
92    /// shortcut (the row-size probe needs a known relation).
93    ///
94    /// ```yaml
95    /// exports:
96    ///   - name: page_views
97    ///     table: public.page_views
98    ///     mode: chunked
99    ///     chunk_size_memory_mb: 256
100    /// ```
101    #[serde(default)]
102    pub chunk_size_memory_mb: Option<u64>,
103    /// Divide the column range into exactly this many equal chunks.
104    /// Mutually exclusive with `chunk_dense` and `chunk_by_days`.
105    /// When set, `chunk_size` is computed dynamically from min/max.
106    pub chunk_count: Option<usize>,
107    pub chunk_by_days: Option<u32>,
108    /// Keyset (seek) pagination on this single index-backed unique key — the
109    /// source-safe shape for tables without a single-integer PK (OPT-4). The
110    /// column MUST be backed by a usable index (PK or unique); the planner
111    /// refuses a non-indexed key rather than emit a full-scan + filesort query.
112    pub chunk_by_key: Option<String>,
113    #[serde(default = "default_parallel")]
114    pub parallel: usize,
115    pub time_column: Option<String>,
116    #[serde(default = "default_time_column_type")]
117    pub time_column_type: TimeColumnType,
118    pub days_window: Option<u32>,
119
120    /// Date/time output partitioning: split this export's rows into one
121    /// destination sub-prefix per calendar bucket of this **DATE or TIMESTAMP**
122    /// column, bucketed by [`partition_granularity`](Self::partition_granularity)
123    /// (`day` / `month` / `year`), in a Hive-style `col=value/` layout
124    /// (`created_at=2023-01-01/`, `created_at=2023-01/`, `created_at=2023/`).
125    /// Requires a `{partition}` token in `destination.path` /
126    /// `destination.prefix`.
127    ///
128    /// This is **not** arbitrary value partitioning: the column's min/max is
129    /// read and parsed as a date to generate contiguous calendar buckets, so a
130    /// non-temporal column (e.g. `partition_by: status`) fails at run time with
131    /// "could not parse partition min '<value>' from column '<col>' as a date".
132    /// To split by a categorical column, write one export per value with a
133    /// `WHERE` filter instead.
134    ///
135    /// Orthogonal to `mode`: each partition runs the export's own mode, so
136    /// `mode: chunked` chunks *within* a day. Rows whose partition column is
137    /// NULL land in `col=__HIVE_DEFAULT_PARTITION__/` (Hive default partition)
138    /// so no row is silently dropped. Not compatible with `mode: time_window`.
139    ///
140    /// ```yaml
141    /// exports:
142    ///   - name: events
143    ///     table: events
144    ///     partition_by: created_at        # must be a DATE or TIMESTAMP column
145    ///     partition_granularity: day
146    ///     destination:
147    ///       type: s3
148    ///       bucket: my-bucket
149    ///       prefix: "events/{partition}/"   # → events/created_at=2023-01-01/
150    /// ```
151    #[serde(default)]
152    pub partition_by: Option<String>,
153
154    /// Calendar bucket width for [`partition_by`](Self::partition_by):
155    /// `day` (default), `month`, or `year`. Determines how the partition
156    /// column's date/timestamp range is split into contiguous Hive buckets
157    /// (`col=2023-01-01/` / `col=2023-01/` / `col=2023/`). Has no effect
158    /// unless `partition_by` is set.
159    #[serde(default)]
160    pub partition_granularity: PartitionGranularity,
161    pub format: FormatType,
162    #[serde(default)]
163    pub compression: CompressionType,
164    pub compression_level: Option<u32>,
165    pub compression_profile: Option<CompressionProfile>,
166    #[serde(default)]
167    pub skip_empty: bool,
168    pub destination: DestinationConfig,
169    /// Integrity depth required of `--validate` for this export's parts.
170    /// `size` (default) accepts size-only verification; `content` requires every
171    /// part's content MD5 to be checked against the store's listing (no
172    /// download) and **fails** validation for any part that could only be
173    /// size-verified — e.g. a part too large to upload as a single PUT (raise
174    /// `max_file_size` down so it fits), or a backend that exposes no checksum.
175    #[serde(default)]
176    pub verify: VerifyMode,
177    #[serde(default)]
178    pub meta_columns: MetaColumns,
179    #[serde(default)]
180    pub quality: Option<QualityConfig>,
181    /// Rotate to a new part when the current file reaches this size.
182    /// Accepts `B`/`KB`/`MB`/`GB` (case-insensitive) or a bare byte count;
183    /// a fractional value is allowed (`1.5GB`). Units are binary (IEC-style):
184    /// `KB` = 1024 bytes, `MB` = 1024 KB, `GB` = 1024 MB. Example: `256MB`.
185    pub max_file_size: Option<String>,
186    #[serde(default)]
187    pub chunk_checkpoint: bool,
188    pub chunk_max_attempts: Option<u32>,
189    #[serde(default)]
190    pub tuning: Option<TuningConfig>,
191    /// Optional logical group for shared source capacity (replica, host). Advisory prioritization only.
192    #[serde(default)]
193    pub source_group: Option<String>,
194    /// Hint (Epic C / ADR-0006) that this export should always be treated as reconcile-heavy
195    /// by planning, independent of the `--reconcile` CLI flag. Advisory only.
196    #[serde(default)]
197    pub reconcile_required: bool,
198
199    /// Per-column type overrides (roadmap §8). Keys are column names; values
200    /// are short type strings such as `decimal(18,2)`, `timestamp_tz`, `json`.
201    ///
202    /// ```yaml
203    /// exports:
204    ///   - name: payments
205    ///     columns:
206    ///       amount: decimal(18,2)
207    ///       fee: decimal(18,6)
208    ///       created_at: timestamp_tz
209    /// ```
210    ///
211    /// Overrides take priority over autodetection and are validated at
212    /// plan time — an invalid type string fails before the export runs.
213    #[serde(default)]
214    pub columns: std::collections::HashMap<String, String>,
215
216    /// Downstream warehouse this export targets (`bigquery` / `bq`,
217    /// `duckdb`). When set, `rivet check --type-report` resolves each column
218    /// against it (native type, honest autoload type, recovery hint) without
219    /// needing `--target` on the CLI — the CLI flag still wins when both are
220    /// present. The Parquet interchange stays target-neutral (ADR-0014 T2);
221    /// `target:` only drives guidance and the future load-schema artifact.
222    ///
223    /// ```yaml
224    /// exports:
225    ///   - name: payments
226    ///     target: bigquery
227    /// ```
228    #[serde(default)]
229    pub target: Option<String>,
230
231    /// Policy applied when structural schema drift is detected (column added, removed, or retyped).
232    /// Defaults to `warn`: log a warning and continue.
233    #[serde(default)]
234    pub on_schema_drift: SchemaDriftPolicy,
235
236    /// Growth-factor threshold for data shape drift warnings (Epic 8).
237    /// When a string/binary column's max observed byte length in the current run
238    /// exceeds `stored_max * shape_drift_warn_factor`, Rivet logs a warning.
239    /// `None` uses the default of 2.0. Set to `0.0` to disable shape tracking.
240    #[serde(default)]
241    pub shape_drift_warn_factor: Option<f64>,
242
243    /// Parquet row group tuning. Only meaningful when `format: parquet`.
244    /// When absent, the parquet library default (1,048,576 rows/group) is used.
245    #[serde(default)]
246    pub parquet: Option<ParquetConfig>,
247}
248
249impl ExportConfig {
250    /// Resolve the effective `(CompressionType, level)` for this export.
251    /// `compression_profile` takes precedence over `compression` + `compression_level`.
252    ///
253    /// L24: when a profile is set *and* a conflicting explicit codec/level was
254    /// written, warn once that the profile wins rather than silently dropping the
255    /// explicit choice. An explicit codec is only detectable when it differs from
256    /// the `#[serde(default)]` (Zstd) — a literal `compression: zstd` alongside a
257    /// profile is indistinguishable from an omitted field and stays silent.
258    pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
259        if let Some(profile) = self.compression_profile {
260            let explicit_codec =
261                (self.compression != CompressionType::default()).then_some(self.compression);
262            if let Some(msg) = super::format::compression_profile_override_warning(
263                profile,
264                explicit_codec,
265                self.compression_level,
266            ) {
267                log::warn!("export '{}': {}", self.name, msg);
268            }
269            profile.to_codec()
270        } else {
271            (self.compression, self.compression_level)
272        }
273    }
274
275    pub fn max_file_size_bytes(&self) -> Option<u64> {
276        self.max_file_size
277            .as_ref()
278            .and_then(|s| parse_file_size(s).ok())
279    }
280
281    pub fn resolve_query(
282        &self,
283        config_dir: &Path,
284        params: Option<&std::collections::HashMap<String, String>>,
285    ) -> crate::error::Result<String> {
286        // table: shortcut takes precedence — already validated by
287        // `validate_business_rules` to be mutually exclusive with query/query_file.
288        if let Some(tbl) = &self.table {
289            validate_table_shortcut_ident(&self.name, tbl)?;
290            return Ok(format!("SELECT * FROM {tbl}"));
291        }
292        match (&self.query, &self.query_file) {
293            (Some(q), None) => {
294                if params.is_some() {
295                    resolve_vars(q, params)
296                } else {
297                    Ok(q.clone())
298                }
299            }
300            (None, Some(file)) => {
301                let file_path = std::path::Path::new(file);
302                // SecOps: block absolute paths and `..` traversal components.
303                if file_path.is_absolute() {
304                    anyhow::bail!(
305                        "export '{}': query_file must be a relative path: '{}'",
306                        self.name,
307                        file
308                    );
309                }
310                if file_path
311                    .components()
312                    .any(|c| c == std::path::Component::ParentDir)
313                {
314                    anyhow::bail!(
315                        "export '{}': query_file path must not contain '..': '{}'",
316                        self.name,
317                        file
318                    );
319                }
320                let joined = config_dir.join(file);
321                // Canonicalize-based check catches symlink-based evasion for files
322                // that already exist on disk.
323                if let Ok(canonical) = joined.canonicalize() {
324                    let base = config_dir
325                        .canonicalize()
326                        .unwrap_or_else(|_| config_dir.to_path_buf());
327                    if !canonical.starts_with(&base) {
328                        anyhow::bail!(
329                            "export '{}': query_file '{}' resolves outside the config directory",
330                            self.name,
331                            file
332                        );
333                    }
334                }
335                let raw = std::fs::read_to_string(&joined)?;
336                resolve_vars(&raw, params)
337            }
338            (Some(_), Some(_)) => {
339                anyhow::bail!(
340                    "export '{}': specify either 'query' or 'query_file', not both",
341                    self.name
342                )
343            }
344            (None, None) => {
345                anyhow::bail!(
346                    "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
347                    self.name
348                )
349            }
350        }
351    }
352}
353
354/// Validate the value of the `table:` YAML shortcut.
355///
356/// Accepts ASCII identifiers in the form `<table>` or `<schema>.<table>`. Each
357/// segment must match `[A-Za-z_][A-Za-z0-9_]*`. Anything else (quoted
358/// identifiers, exotic chars, three-part names, SQL injection attempts) is
359/// rejected — the user should fall back to `query:` for those cases.
360///
361/// The bound on identifier shape keeps generated SQL safe to interpolate
362/// without quoting and ensures the generated `SELECT * FROM <ident>` form is
363/// recognised by the PG catalog-hint parser ([src/source/postgres.rs]).
364fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
365    let trimmed = raw.trim();
366    if trimmed.is_empty() {
367        anyhow::bail!("export '{export_name}': 'table' is empty");
368    }
369    let parts: Vec<&str> = trimmed.split('.').collect();
370    if parts.len() > 2 {
371        anyhow::bail!(
372            "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
373        );
374    }
375    for part in &parts {
376        if part.is_empty() {
377            anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
378        }
379        let mut chars = part.chars();
380        let first = chars.next().unwrap();
381        if !(first.is_ascii_alphabetic() || first == '_') {
382            anyhow::bail!(
383                "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
384            );
385        }
386        if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
387            anyhow::bail!(
388                "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
389            );
390        }
391    }
392    Ok(())
393}
394
395#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
396#[serde(deny_unknown_fields)]
397pub struct QualityConfig {
398    pub row_count_min: Option<usize>,
399    pub row_count_max: Option<usize>,
400    #[serde(default)]
401    pub null_ratio_max: std::collections::HashMap<String, f64>,
402    #[serde(default)]
403    pub unique_columns: Vec<String>,
404    /// Cap on the number of distinct values tracked per column during uniqueness checks.
405    /// When the limit is hit, a Warn issue is emitted and tracking stops for that column.
406    /// Prevents unbounded HashSet growth on high-cardinality columns.
407    pub unique_max_entries: Option<usize>,
408}
409
410#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
411#[serde(deny_unknown_fields)]
412pub struct MetaColumns {
413    #[serde(default)]
414    pub exported_at: bool,
415    #[serde(default)]
416    pub row_hash: bool,
417}
418
419fn default_mode() -> ExportMode {
420    ExportMode::Full
421}
422
423fn default_chunk_size() -> usize {
424    100_000
425}
426
427fn default_parallel() -> usize {
428    1
429}
430
431fn default_time_column_type() -> TimeColumnType {
432    TimeColumnType::Timestamp
433}
434
435#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
436#[serde(rename_all = "snake_case")]
437pub enum ExportMode {
438    Full,
439    Incremental,
440    Chunked,
441    TimeWindow,
442}
443
444#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
445#[serde(rename_all = "lowercase")]
446pub enum TimeColumnType {
447    Timestamp,
448    Unix,
449}
450
451/// Calendar bucket width for date/timestamp output partitioning
452/// ([`ExportConfig::partition_by`]). The partition column must be a DATE or
453/// TIMESTAMP column; this picks how its range is split into contiguous Hive
454/// buckets. It is not a knob for partitioning by arbitrary column values.
455#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
456#[serde(rename_all = "lowercase")]
457pub enum PartitionGranularity {
458    /// One bucket per calendar day (`col=2023-01-01/`). Default.
459    #[default]
460    Day,
461    /// One bucket per calendar month (`col=2023-01/`).
462    Month,
463    /// One bucket per calendar year (`col=2023/`).
464    Year,
465}
466
467/// Canonical fully-populated [`ExportConfig`] for tests across the crate.
468///
469/// One place lists every field, so adding a field is a single-site edit (the
470/// compiler still flags this literal if a field is missing). Test call sites
471/// take this baseline and override only the fields they exercise, rather than
472/// hand-writing the full struct — see `plan::build` and `preflight` tests.
473#[cfg(test)]
474pub(crate) fn sample_export(name: &str) -> ExportConfig {
475    ExportConfig {
476        name: name.into(),
477        target: None,
478        verify: VerifyMode::Size,
479        query: Some("SELECT 1".into()),
480        query_file: None,
481        table: None,
482        mode: ExportMode::Full,
483        cursor_column: None,
484        cursor_fallback_column: None,
485        incremental_cursor_mode: Default::default(),
486        chunk_column: None,
487        chunk_dense: false,
488        chunk_size: 100_000,
489        chunk_size_memory_mb: None,
490        chunk_count: None,
491        chunk_by_days: None,
492        chunk_by_key: None,
493        parallel: 1,
494        time_column: None,
495        time_column_type: TimeColumnType::Timestamp,
496        days_window: None,
497        partition_by: None,
498        partition_granularity: PartitionGranularity::Day,
499        format: FormatType::Parquet,
500        compression: CompressionType::None,
501        compression_level: None,
502        compression_profile: None,
503        skip_empty: false,
504        destination: crate::config::DestinationConfig {
505            destination_type: crate::config::DestinationType::Local,
506            path: Some("/tmp".into()),
507            ..Default::default()
508        },
509        meta_columns: MetaColumns::default(),
510        quality: None,
511        max_file_size: None,
512        chunk_checkpoint: false,
513        chunk_max_attempts: None,
514        tuning: None,
515        source_group: None,
516        reconcile_required: false,
517        columns: Default::default(),
518        on_schema_drift: Default::default(),
519        shape_drift_warn_factor: None,
520        parquet: None,
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527
528    // ── ExportConfig::max_file_size_bytes ───────────────────────────────────
529
530    fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
531        let yaml = format!(
532            "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n  type: local\n  path: /tmp\n{extra}"
533        );
534        serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
535    }
536
537    #[test]
538    fn max_file_size_bytes_none_when_unset() {
539        let exp = make_export_yaml("no_limit", "");
540        assert!(exp.max_file_size_bytes().is_none());
541    }
542
543    #[test]
544    fn max_file_size_bytes_parses_mb() {
545        let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
546        assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
547    }
548
549    #[test]
550    fn max_file_size_bytes_parses_gb() {
551        let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
552        assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
553    }
554
555    #[test]
556    fn max_file_size_bytes_returns_none_on_invalid() {
557        let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
558        assert!(exp.max_file_size_bytes().is_none());
559    }
560
561    // ── ExportConfig::resolve_query ─────────────────────────────────────────
562
563    // Build a minimal ExportConfig directly, bypassing Config::from_yaml validation.
564    // This lets us test the four branches inside resolve_query itself, including
565    // the (both-set / neither-set) error paths that are normally prevented by the
566    // top-level validator.
567    fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
568        ExportConfig {
569            query: query.map(|s| s.to_string()),
570            query_file: query_file.map(|s| s.to_string()),
571            ..sample_export("test")
572        }
573    }
574
575    fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
576        pairs
577            .iter()
578            .map(|(k, v)| (k.to_string(), v.to_string()))
579            .collect()
580    }
581
582    #[test]
583    fn resolve_query_inline_no_params_returns_query_as_is() {
584        let exp = make_export_direct(Some("SELECT id FROM orders"), None);
585        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
586        assert_eq!(q, "SELECT id FROM orders");
587    }
588
589    #[test]
590    fn resolve_query_inline_with_params_substitutes_vars() {
591        let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
592        let p = params(&[("col", "id"), ("table", "orders")]);
593        let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
594        assert_eq!(q, "SELECT id FROM orders");
595    }
596
597    #[test]
598    fn resolve_query_inline_params_empty_map_is_noop() {
599        let exp = make_export_direct(Some("SELECT 1"), None);
600        let p = params(&[]);
601        let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
602        assert_eq!(q, "SELECT 1");
603    }
604
605    #[test]
606    fn resolve_query_inline_missing_var_returns_error() {
607        // SAFETY: test-only; this binary is single-threaded in the test runner context.
608        unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
609        let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
610        let p = params(&[]);
611        let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
612        assert!(result.is_err());
613        let msg = format!("{:#}", result.unwrap_err());
614        assert!(
615            msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
616            "got: {msg}"
617        );
618    }
619
620    #[test]
621    fn resolve_query_file_reads_content() {
622        let dir = tempfile::TempDir::new().unwrap();
623        let sql_path = dir.path().join("query.sql");
624        std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
625        let exp = make_export_direct(None, Some("query.sql"));
626        let q = exp.resolve_query(dir.path(), None).unwrap();
627        assert_eq!(q, "SELECT * FROM customers");
628    }
629
630    #[test]
631    fn resolve_query_file_with_params_substitutes() {
632        let dir = tempfile::TempDir::new().unwrap();
633        let sql_path = dir.path().join("q.sql");
634        std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
635        let exp = make_export_direct(None, Some("q.sql"));
636        let p = params(&[("col", "name"), ("tbl", "users")]);
637        let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
638        assert_eq!(q, "SELECT name FROM users");
639    }
640
641    // ── `table:` shortcut ───────────────────────────────────────────────────
642
643    #[test]
644    fn resolve_query_table_shortcut_qualified() {
645        let mut exp = make_export_direct(None, None);
646        exp.table = Some("public.users".into());
647        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
648        assert_eq!(q, "SELECT * FROM public.users");
649    }
650
651    #[test]
652    fn resolve_query_table_shortcut_unqualified() {
653        let mut exp = make_export_direct(None, None);
654        exp.table = Some("orders".into());
655        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
656        assert_eq!(q, "SELECT * FROM orders");
657    }
658
659    #[test]
660    fn resolve_query_table_shortcut_rejects_three_part_name() {
661        let mut exp = make_export_direct(None, None);
662        exp.table = Some("db.public.users".into());
663        let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
664        let msg = format!("{err:#}");
665        assert!(msg.contains("<schema>.<name>"), "got: {msg}");
666    }
667
668    #[test]
669    fn resolve_query_table_shortcut_rejects_sql_injection() {
670        for bad in [
671            "users; DROP TABLE x",
672            "users--",
673            "users'",
674            "users\"",
675            "public.\"My Table\"",
676            "0starts_with_digit",
677            "",
678            ".trailing",
679            "leading.",
680            "two..dots",
681        ] {
682            let mut exp = make_export_direct(None, None);
683            exp.table = Some(bad.into());
684            assert!(
685                exp.resolve_query(Path::new("/tmp"), None).is_err(),
686                "should reject `table:` value '{bad}'",
687            );
688        }
689    }
690
691    #[test]
692    fn resolve_query_table_shortcut_takes_precedence_over_query() {
693        let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
694        exp.table = Some("public.y".into());
695        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
696        assert_eq!(q, "SELECT * FROM public.y");
697    }
698
699    #[test]
700    fn resolve_query_file_missing_returns_error() {
701        let dir = tempfile::TempDir::new().unwrap();
702        let exp = make_export_direct(None, Some("nonexistent.sql"));
703        let result = exp.resolve_query(dir.path(), None);
704        assert!(result.is_err());
705        let msg = format!("{:#}", result.unwrap_err());
706        assert!(
707            msg.contains("nonexistent.sql") || msg.contains("No such file"),
708            "got: {msg}"
709        );
710    }
711
712    #[test]
713    fn resolve_query_both_set_returns_error() {
714        let mut exp = make_export_direct(Some("SELECT 1"), None);
715        exp.query_file = Some("file.sql".into());
716        let result = exp.resolve_query(Path::new("/tmp"), None);
717        assert!(result.is_err());
718        let msg = format!("{:#}", result.unwrap_err());
719        assert!(
720            msg.contains("not both") || msg.contains("query_file"),
721            "got: {msg}"
722        );
723    }
724
725    #[test]
726    fn resolve_query_neither_set_returns_error() {
727        let exp = make_export_direct(None, None);
728        let result = exp.resolve_query(Path::new("/tmp"), None);
729        assert!(result.is_err());
730        let msg = format!("{:#}", result.unwrap_err());
731        assert!(
732            msg.contains("query") || msg.contains("query_file"),
733            "got: {msg}"
734        );
735    }
736
737    // ── SecOps: query_file path traversal prevention ──────────────────────────
738
739    #[test]
740    fn resolve_query_file_dotdot_is_rejected() {
741        let dir = tempfile::TempDir::new().unwrap();
742        let exp = make_export_direct(None, Some("../secret.sql"));
743        let result = exp.resolve_query(dir.path(), None);
744        assert!(result.is_err());
745        let msg = format!("{:#}", result.unwrap_err());
746        assert!(
747            msg.contains("..") || msg.contains("traversal"),
748            "got: {msg}"
749        );
750    }
751
752    #[test]
753    fn resolve_query_file_nested_dotdot_is_rejected() {
754        let dir = tempfile::TempDir::new().unwrap();
755        let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
756        let result = exp.resolve_query(dir.path(), None);
757        assert!(result.is_err());
758        let msg = format!("{:#}", result.unwrap_err());
759        assert!(
760            msg.contains("..") || msg.contains("traversal"),
761            "got: {msg}"
762        );
763    }
764
765    #[test]
766    fn resolve_query_file_absolute_path_is_rejected() {
767        let dir = tempfile::TempDir::new().unwrap();
768        let exp = make_export_direct(None, Some("/etc/passwd"));
769        let result = exp.resolve_query(dir.path(), None);
770        assert!(result.is_err());
771        let msg = format!("{:#}", result.unwrap_err());
772        assert!(
773            msg.contains("relative") || msg.contains("absolute"),
774            "got: {msg}"
775        );
776    }
777
778    #[test]
779    fn resolve_query_file_in_subdir_is_allowed() {
780        let dir = tempfile::TempDir::new().unwrap();
781        let subdir = dir.path().join("queries");
782        std::fs::create_dir(&subdir).unwrap();
783        std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
784        let exp = make_export_direct(None, Some("queries/orders.sql"));
785        let q = exp.resolve_query(dir.path(), None).unwrap();
786        assert_eq!(q, "SELECT * FROM orders");
787    }
788}