Skip to main content

rivet/config/
export.rs

1//! Per-export configuration: query/table/mode, chunking, format, destination link.
2//!
3//! `SchemaDriftPolicy` lives here because it is only ever read via
4//! [`ExportConfig::on_schema_drift`].
5
6use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17/// What to do when structural schema drift is detected (column added, removed, or retyped).
18///
19/// ```yaml
20/// exports:
21///   - name: orders
22///     on_schema_drift: fail   # warn (default), continue, fail
23/// ```
24/// How deep `--validate` must verify each part's integrity.
25#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
26#[serde(rename_all = "snake_case")]
27pub enum VerifyMode {
28    /// Accept size-only verification when no content checksum is available.
29    #[default]
30    Size,
31    /// Require every part's content to be MD5-verified against the store's
32    /// listing; fail validation for any part that is only size-verified.
33    Content,
34}
35
36impl VerifyMode {
37    /// Whether content (not just size) verification is required.
38    pub fn requires_content(self) -> bool {
39        matches!(self, VerifyMode::Content)
40    }
41}
42
43#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
44#[serde(rename_all = "snake_case")]
45pub enum SchemaDriftPolicy {
46    /// Log a warning and continue. The new schema fingerprint is stored. (Default.)
47    #[default]
48    Warn,
49    /// Silently accept schema changes — store the new schema, no log output.
50    Continue,
51    /// Abort the run with a non-zero exit. The schema store is NOT updated so the
52    /// next run will detect the same change again.
53    Fail,
54}
55#[derive(Debug, Deserialize, JsonSchema, Clone)]
56#[serde(deny_unknown_fields)]
57pub struct ExportConfig {
58    pub name: String,
59    #[serde(default)]
60    pub query: Option<String>,
61    pub query_file: Option<String>,
62    /// Shortcut for `query: "SELECT * FROM <schema>.<table>"`.
63    ///
64    /// Accepts `table` or `schema.table` with ASCII-only identifiers
65    /// (`[A-Za-z_][A-Za-z0-9_]*`). Generates an unquoted single-table
66    /// query so the Postgres NUMERIC catalog-hint resolver recognises it
67    /// and auto-types `numeric(p,s)` columns without manual overrides.
68    ///
69    /// Mutually exclusive with `query` and `query_file`.
70    #[serde(default)]
71    pub table: Option<String>,
72    #[serde(default = "default_mode")]
73    pub mode: ExportMode,
74    pub cursor_column: Option<String>,
75    /// Secondary column for [`IncrementalCursorMode::Coalesce`] only (see ADR-0007).
76    #[serde(default)]
77    pub cursor_fallback_column: Option<String>,
78    /// How primary (and optional fallback) columns drive incremental progression.
79    #[serde(default)]
80    pub incremental_cursor_mode: IncrementalCursorMode,
81    pub chunk_column: Option<String>,
82    #[serde(default)]
83    pub chunk_dense: bool,
84    #[serde(default = "default_chunk_size")]
85    pub chunk_size: usize,
86    /// Target memory budget per chunk in MB. When set, `chunk_size` is derived
87    /// from this budget at plan-build time using a `pg_class` row-size estimate
88    /// (`pg_relation_size / reltuples`), clamped to `[10_000, 5_000_000]` rows.
89    ///
90    /// Mutually exclusive with an explicit non-default `chunk_size:`. Only
91    /// applies to `mode: chunked` on a Postgres source using the `table:`
92    /// shortcut (the row-size probe needs a known relation).
93    ///
94    /// ```yaml
95    /// exports:
96    ///   - name: page_views
97    ///     table: public.page_views
98    ///     mode: chunked
99    ///     chunk_size_memory_mb: 256
100    /// ```
101    #[serde(default)]
102    pub chunk_size_memory_mb: Option<u64>,
103    /// Divide the column range into exactly this many equal chunks.
104    /// Mutually exclusive with `chunk_dense` and `chunk_by_days`.
105    /// When set, `chunk_size` is computed dynamically from min/max.
106    pub chunk_count: Option<usize>,
107    pub chunk_by_days: Option<u32>,
108    /// Keyset (seek) pagination on this single index-backed unique key — the
109    /// source-safe shape for tables without a single-integer PK (OPT-4). The
110    /// column MUST be backed by a usable index (PK or unique); the planner
111    /// refuses a non-indexed key rather than emit a full-scan + filesort query.
112    pub chunk_by_key: Option<String>,
113    #[serde(default = "default_parallel")]
114    pub parallel: usize,
115    pub time_column: Option<String>,
116    #[serde(default = "default_time_column_type")]
117    pub time_column_type: TimeColumnType,
118    pub days_window: Option<u32>,
119
120    /// Value-based output partitioning: split this export's rows into one
121    /// destination sub-prefix per distinct bucket of this column's value
122    /// (Hive-style `col=value/` layout). The bucket width is
123    /// `partition_granularity`. Requires a `{partition}` token in
124    /// `destination.path` / `destination.prefix`.
125    ///
126    /// Orthogonal to `mode`: each partition runs the export's own mode, so
127    /// `mode: chunked` chunks *within* a day. Rows whose partition column is
128    /// NULL land in `col=__HIVE_DEFAULT_PARTITION__/` (Hive default partition)
129    /// so no row is silently dropped. Not compatible with `mode: time_window`.
130    ///
131    /// ```yaml
132    /// exports:
133    ///   - name: events
134    ///     table: events
135    ///     partition_by: created_at
136    ///     partition_granularity: day
137    ///     destination:
138    ///       type: s3
139    ///       bucket: my-bucket
140    ///       prefix: "events/{partition}/"   # → events/created_at=2023-01-01/
141    /// ```
142    #[serde(default)]
143    pub partition_by: Option<String>,
144
145    /// Bucket width for [`partition_by`](Self::partition_by). Default `day`.
146    #[serde(default)]
147    pub partition_granularity: PartitionGranularity,
148    pub format: FormatType,
149    #[serde(default)]
150    pub compression: CompressionType,
151    pub compression_level: Option<u32>,
152    pub compression_profile: Option<CompressionProfile>,
153    #[serde(default)]
154    pub skip_empty: bool,
155    pub destination: DestinationConfig,
156    /// Integrity depth required of `--validate` for this export's parts.
157    /// `size` (default) accepts size-only verification; `content` requires every
158    /// part's content MD5 to be checked against the store's listing (no
159    /// download) and **fails** validation for any part that could only be
160    /// size-verified — e.g. a part too large to upload as a single PUT (raise
161    /// `max_file_size` down so it fits), or a backend that exposes no checksum.
162    #[serde(default)]
163    pub verify: VerifyMode,
164    #[serde(default)]
165    pub meta_columns: MetaColumns,
166    #[serde(default)]
167    pub quality: Option<QualityConfig>,
168    pub max_file_size: Option<String>,
169    #[serde(default)]
170    pub chunk_checkpoint: bool,
171    pub chunk_max_attempts: Option<u32>,
172    #[serde(default)]
173    pub tuning: Option<TuningConfig>,
174    /// Optional logical group for shared source capacity (replica, host). Advisory prioritization only.
175    #[serde(default)]
176    pub source_group: Option<String>,
177    /// Hint (Epic C / ADR-0006) that this export should always be treated as reconcile-heavy
178    /// by planning, independent of the `--reconcile` CLI flag. Advisory only.
179    #[serde(default)]
180    pub reconcile_required: bool,
181
182    /// Per-column type overrides (roadmap §8). Keys are column names; values
183    /// are short type strings such as `decimal(18,2)`, `timestamp_tz`, `json`.
184    ///
185    /// ```yaml
186    /// exports:
187    ///   - name: payments
188    ///     columns:
189    ///       amount: decimal(18,2)
190    ///       fee: decimal(18,6)
191    ///       created_at: timestamp_tz
192    /// ```
193    ///
194    /// Overrides take priority over autodetection and are validated at
195    /// plan time — an invalid type string fails before the export runs.
196    #[serde(default)]
197    pub columns: std::collections::HashMap<String, String>,
198
199    /// Downstream warehouse this export targets (`bigquery` / `bq`,
200    /// `duckdb`). When set, `rivet check --type-report` resolves each column
201    /// against it (native type, honest autoload type, recovery hint) without
202    /// needing `--target` on the CLI — the CLI flag still wins when both are
203    /// present. The Parquet interchange stays target-neutral (ADR-0014 T2);
204    /// `target:` only drives guidance and the future load-schema artifact.
205    ///
206    /// ```yaml
207    /// exports:
208    ///   - name: payments
209    ///     target: bigquery
210    /// ```
211    #[serde(default)]
212    pub target: Option<String>,
213
214    /// Policy applied when structural schema drift is detected (column added, removed, or retyped).
215    /// Defaults to `warn`: log a warning and continue.
216    #[serde(default)]
217    pub on_schema_drift: SchemaDriftPolicy,
218
219    /// Growth-factor threshold for data shape drift warnings (Epic 8).
220    /// When a string/binary column's max observed byte length in the current run
221    /// exceeds `stored_max * shape_drift_warn_factor`, Rivet logs a warning.
222    /// `None` uses the default of 2.0. Set to `0.0` to disable shape tracking.
223    #[serde(default)]
224    pub shape_drift_warn_factor: Option<f64>,
225
226    /// Parquet row group tuning. Only meaningful when `format: parquet`.
227    /// When absent, the parquet library default (1,048,576 rows/group) is used.
228    #[serde(default)]
229    pub parquet: Option<ParquetConfig>,
230}
231
232impl ExportConfig {
233    /// Resolve the effective `(CompressionType, level)` for this export.
234    /// `compression_profile` takes precedence over `compression` + `compression_level`.
235    pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
236        if let Some(profile) = self.compression_profile {
237            profile.to_codec()
238        } else {
239            (self.compression, self.compression_level)
240        }
241    }
242
243    pub fn max_file_size_bytes(&self) -> Option<u64> {
244        self.max_file_size
245            .as_ref()
246            .and_then(|s| parse_file_size(s).ok())
247    }
248
249    pub fn resolve_query(
250        &self,
251        config_dir: &Path,
252        params: Option<&std::collections::HashMap<String, String>>,
253    ) -> crate::error::Result<String> {
254        // table: shortcut takes precedence — already validated by
255        // `validate_business_rules` to be mutually exclusive with query/query_file.
256        if let Some(tbl) = &self.table {
257            validate_table_shortcut_ident(&self.name, tbl)?;
258            return Ok(format!("SELECT * FROM {tbl}"));
259        }
260        match (&self.query, &self.query_file) {
261            (Some(q), None) => {
262                if params.is_some() {
263                    resolve_vars(q, params)
264                } else {
265                    Ok(q.clone())
266                }
267            }
268            (None, Some(file)) => {
269                let file_path = std::path::Path::new(file);
270                // SecOps: block absolute paths and `..` traversal components.
271                if file_path.is_absolute() {
272                    anyhow::bail!(
273                        "export '{}': query_file must be a relative path: '{}'",
274                        self.name,
275                        file
276                    );
277                }
278                if file_path
279                    .components()
280                    .any(|c| c == std::path::Component::ParentDir)
281                {
282                    anyhow::bail!(
283                        "export '{}': query_file path must not contain '..': '{}'",
284                        self.name,
285                        file
286                    );
287                }
288                let joined = config_dir.join(file);
289                // Canonicalize-based check catches symlink-based evasion for files
290                // that already exist on disk.
291                if let Ok(canonical) = joined.canonicalize() {
292                    let base = config_dir
293                        .canonicalize()
294                        .unwrap_or_else(|_| config_dir.to_path_buf());
295                    if !canonical.starts_with(&base) {
296                        anyhow::bail!(
297                            "export '{}': query_file '{}' resolves outside the config directory",
298                            self.name,
299                            file
300                        );
301                    }
302                }
303                let raw = std::fs::read_to_string(&joined)?;
304                resolve_vars(&raw, params)
305            }
306            (Some(_), Some(_)) => {
307                anyhow::bail!(
308                    "export '{}': specify either 'query' or 'query_file', not both",
309                    self.name
310                )
311            }
312            (None, None) => {
313                anyhow::bail!(
314                    "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
315                    self.name
316                )
317            }
318        }
319    }
320}
321
322/// Validate the value of the `table:` YAML shortcut.
323///
324/// Accepts ASCII identifiers in the form `<table>` or `<schema>.<table>`. Each
325/// segment must match `[A-Za-z_][A-Za-z0-9_]*`. Anything else (quoted
326/// identifiers, exotic chars, three-part names, SQL injection attempts) is
327/// rejected — the user should fall back to `query:` for those cases.
328///
329/// The bound on identifier shape keeps generated SQL safe to interpolate
330/// without quoting and ensures the generated `SELECT * FROM <ident>` form is
331/// recognised by the PG catalog-hint parser ([src/source/postgres.rs]).
332fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
333    let trimmed = raw.trim();
334    if trimmed.is_empty() {
335        anyhow::bail!("export '{export_name}': 'table' is empty");
336    }
337    let parts: Vec<&str> = trimmed.split('.').collect();
338    if parts.len() > 2 {
339        anyhow::bail!(
340            "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
341        );
342    }
343    for part in &parts {
344        if part.is_empty() {
345            anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
346        }
347        let mut chars = part.chars();
348        let first = chars.next().unwrap();
349        if !(first.is_ascii_alphabetic() || first == '_') {
350            anyhow::bail!(
351                "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
352            );
353        }
354        if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
355            anyhow::bail!(
356                "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
357            );
358        }
359    }
360    Ok(())
361}
362
363#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
364#[serde(deny_unknown_fields)]
365pub struct QualityConfig {
366    pub row_count_min: Option<usize>,
367    pub row_count_max: Option<usize>,
368    #[serde(default)]
369    pub null_ratio_max: std::collections::HashMap<String, f64>,
370    #[serde(default)]
371    pub unique_columns: Vec<String>,
372    /// Cap on the number of distinct values tracked per column during uniqueness checks.
373    /// When the limit is hit, a Warn issue is emitted and tracking stops for that column.
374    /// Prevents unbounded HashSet growth on high-cardinality columns.
375    pub unique_max_entries: Option<usize>,
376}
377
378#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
379#[serde(deny_unknown_fields)]
380pub struct MetaColumns {
381    #[serde(default)]
382    pub exported_at: bool,
383    #[serde(default)]
384    pub row_hash: bool,
385}
386
387fn default_mode() -> ExportMode {
388    ExportMode::Full
389}
390
391fn default_chunk_size() -> usize {
392    100_000
393}
394
395fn default_parallel() -> usize {
396    1
397}
398
399fn default_time_column_type() -> TimeColumnType {
400    TimeColumnType::Timestamp
401}
402
403#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
404#[serde(rename_all = "snake_case")]
405pub enum ExportMode {
406    Full,
407    Incremental,
408    Chunked,
409    TimeWindow,
410}
411
412#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
413#[serde(rename_all = "lowercase")]
414pub enum TimeColumnType {
415    Timestamp,
416    Unix,
417}
418
419/// Bucket width for value-based output partitioning ([`ExportConfig::partition_by`]).
420#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
421#[serde(rename_all = "lowercase")]
422pub enum PartitionGranularity {
423    #[default]
424    Day,
425    Month,
426    Year,
427}
428
429/// Canonical fully-populated [`ExportConfig`] for tests across the crate.
430///
431/// One place lists every field, so adding a field is a single-site edit (the
432/// compiler still flags this literal if a field is missing). Test call sites
433/// take this baseline and override only the fields they exercise, rather than
434/// hand-writing the full struct — see `plan::build` and `preflight` tests.
435#[cfg(test)]
436pub(crate) fn sample_export(name: &str) -> ExportConfig {
437    ExportConfig {
438        name: name.into(),
439        target: None,
440        verify: VerifyMode::Size,
441        query: Some("SELECT 1".into()),
442        query_file: None,
443        table: None,
444        mode: ExportMode::Full,
445        cursor_column: None,
446        cursor_fallback_column: None,
447        incremental_cursor_mode: Default::default(),
448        chunk_column: None,
449        chunk_dense: false,
450        chunk_size: 100_000,
451        chunk_size_memory_mb: None,
452        chunk_count: None,
453        chunk_by_days: None,
454        chunk_by_key: None,
455        parallel: 1,
456        time_column: None,
457        time_column_type: TimeColumnType::Timestamp,
458        days_window: None,
459        partition_by: None,
460        partition_granularity: PartitionGranularity::Day,
461        format: FormatType::Parquet,
462        compression: CompressionType::None,
463        compression_level: None,
464        compression_profile: None,
465        skip_empty: false,
466        destination: crate::config::DestinationConfig {
467            destination_type: crate::config::DestinationType::Local,
468            path: Some("/tmp".into()),
469            ..Default::default()
470        },
471        meta_columns: MetaColumns::default(),
472        quality: None,
473        max_file_size: None,
474        chunk_checkpoint: false,
475        chunk_max_attempts: None,
476        tuning: None,
477        source_group: None,
478        reconcile_required: false,
479        columns: Default::default(),
480        on_schema_drift: Default::default(),
481        shape_drift_warn_factor: None,
482        parquet: None,
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489
490    // ── ExportConfig::max_file_size_bytes ───────────────────────────────────
491
492    fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
493        let yaml = format!(
494            "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n  type: local\n  path: /tmp\n{extra}"
495        );
496        serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
497    }
498
499    #[test]
500    fn max_file_size_bytes_none_when_unset() {
501        let exp = make_export_yaml("no_limit", "");
502        assert!(exp.max_file_size_bytes().is_none());
503    }
504
505    #[test]
506    fn max_file_size_bytes_parses_mb() {
507        let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
508        assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
509    }
510
511    #[test]
512    fn max_file_size_bytes_parses_gb() {
513        let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
514        assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
515    }
516
517    #[test]
518    fn max_file_size_bytes_returns_none_on_invalid() {
519        let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
520        assert!(exp.max_file_size_bytes().is_none());
521    }
522
523    // ── ExportConfig::resolve_query ─────────────────────────────────────────
524
525    // Build a minimal ExportConfig directly, bypassing Config::from_yaml validation.
526    // This lets us test the four branches inside resolve_query itself, including
527    // the (both-set / neither-set) error paths that are normally prevented by the
528    // top-level validator.
529    fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
530        ExportConfig {
531            query: query.map(|s| s.to_string()),
532            query_file: query_file.map(|s| s.to_string()),
533            ..sample_export("test")
534        }
535    }
536
537    fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
538        pairs
539            .iter()
540            .map(|(k, v)| (k.to_string(), v.to_string()))
541            .collect()
542    }
543
544    #[test]
545    fn resolve_query_inline_no_params_returns_query_as_is() {
546        let exp = make_export_direct(Some("SELECT id FROM orders"), None);
547        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
548        assert_eq!(q, "SELECT id FROM orders");
549    }
550
551    #[test]
552    fn resolve_query_inline_with_params_substitutes_vars() {
553        let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
554        let p = params(&[("col", "id"), ("table", "orders")]);
555        let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
556        assert_eq!(q, "SELECT id FROM orders");
557    }
558
559    #[test]
560    fn resolve_query_inline_params_empty_map_is_noop() {
561        let exp = make_export_direct(Some("SELECT 1"), None);
562        let p = params(&[]);
563        let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
564        assert_eq!(q, "SELECT 1");
565    }
566
567    #[test]
568    fn resolve_query_inline_missing_var_returns_error() {
569        // SAFETY: test-only; this binary is single-threaded in the test runner context.
570        unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
571        let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
572        let p = params(&[]);
573        let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
574        assert!(result.is_err());
575        let msg = format!("{:#}", result.unwrap_err());
576        assert!(
577            msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
578            "got: {msg}"
579        );
580    }
581
582    #[test]
583    fn resolve_query_file_reads_content() {
584        let dir = tempfile::TempDir::new().unwrap();
585        let sql_path = dir.path().join("query.sql");
586        std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
587        let exp = make_export_direct(None, Some("query.sql"));
588        let q = exp.resolve_query(dir.path(), None).unwrap();
589        assert_eq!(q, "SELECT * FROM customers");
590    }
591
592    #[test]
593    fn resolve_query_file_with_params_substitutes() {
594        let dir = tempfile::TempDir::new().unwrap();
595        let sql_path = dir.path().join("q.sql");
596        std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
597        let exp = make_export_direct(None, Some("q.sql"));
598        let p = params(&[("col", "name"), ("tbl", "users")]);
599        let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
600        assert_eq!(q, "SELECT name FROM users");
601    }
602
603    // ── `table:` shortcut ───────────────────────────────────────────────────
604
605    #[test]
606    fn resolve_query_table_shortcut_qualified() {
607        let mut exp = make_export_direct(None, None);
608        exp.table = Some("public.users".into());
609        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
610        assert_eq!(q, "SELECT * FROM public.users");
611    }
612
613    #[test]
614    fn resolve_query_table_shortcut_unqualified() {
615        let mut exp = make_export_direct(None, None);
616        exp.table = Some("orders".into());
617        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
618        assert_eq!(q, "SELECT * FROM orders");
619    }
620
621    #[test]
622    fn resolve_query_table_shortcut_rejects_three_part_name() {
623        let mut exp = make_export_direct(None, None);
624        exp.table = Some("db.public.users".into());
625        let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
626        let msg = format!("{err:#}");
627        assert!(msg.contains("<schema>.<name>"), "got: {msg}");
628    }
629
630    #[test]
631    fn resolve_query_table_shortcut_rejects_sql_injection() {
632        for bad in [
633            "users; DROP TABLE x",
634            "users--",
635            "users'",
636            "users\"",
637            "public.\"My Table\"",
638            "0starts_with_digit",
639            "",
640            ".trailing",
641            "leading.",
642            "two..dots",
643        ] {
644            let mut exp = make_export_direct(None, None);
645            exp.table = Some(bad.into());
646            assert!(
647                exp.resolve_query(Path::new("/tmp"), None).is_err(),
648                "should reject `table:` value '{bad}'",
649            );
650        }
651    }
652
653    #[test]
654    fn resolve_query_table_shortcut_takes_precedence_over_query() {
655        let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
656        exp.table = Some("public.y".into());
657        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
658        assert_eq!(q, "SELECT * FROM public.y");
659    }
660
661    #[test]
662    fn resolve_query_file_missing_returns_error() {
663        let dir = tempfile::TempDir::new().unwrap();
664        let exp = make_export_direct(None, Some("nonexistent.sql"));
665        let result = exp.resolve_query(dir.path(), None);
666        assert!(result.is_err());
667        let msg = format!("{:#}", result.unwrap_err());
668        assert!(
669            msg.contains("nonexistent.sql") || msg.contains("No such file"),
670            "got: {msg}"
671        );
672    }
673
674    #[test]
675    fn resolve_query_both_set_returns_error() {
676        let mut exp = make_export_direct(Some("SELECT 1"), None);
677        exp.query_file = Some("file.sql".into());
678        let result = exp.resolve_query(Path::new("/tmp"), None);
679        assert!(result.is_err());
680        let msg = format!("{:#}", result.unwrap_err());
681        assert!(
682            msg.contains("not both") || msg.contains("query_file"),
683            "got: {msg}"
684        );
685    }
686
687    #[test]
688    fn resolve_query_neither_set_returns_error() {
689        let exp = make_export_direct(None, None);
690        let result = exp.resolve_query(Path::new("/tmp"), None);
691        assert!(result.is_err());
692        let msg = format!("{:#}", result.unwrap_err());
693        assert!(
694            msg.contains("query") || msg.contains("query_file"),
695            "got: {msg}"
696        );
697    }
698
699    // ── SecOps: query_file path traversal prevention ──────────────────────────
700
701    #[test]
702    fn resolve_query_file_dotdot_is_rejected() {
703        let dir = tempfile::TempDir::new().unwrap();
704        let exp = make_export_direct(None, Some("../secret.sql"));
705        let result = exp.resolve_query(dir.path(), None);
706        assert!(result.is_err());
707        let msg = format!("{:#}", result.unwrap_err());
708        assert!(
709            msg.contains("..") || msg.contains("traversal"),
710            "got: {msg}"
711        );
712    }
713
714    #[test]
715    fn resolve_query_file_nested_dotdot_is_rejected() {
716        let dir = tempfile::TempDir::new().unwrap();
717        let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
718        let result = exp.resolve_query(dir.path(), None);
719        assert!(result.is_err());
720        let msg = format!("{:#}", result.unwrap_err());
721        assert!(
722            msg.contains("..") || msg.contains("traversal"),
723            "got: {msg}"
724        );
725    }
726
727    #[test]
728    fn resolve_query_file_absolute_path_is_rejected() {
729        let dir = tempfile::TempDir::new().unwrap();
730        let exp = make_export_direct(None, Some("/etc/passwd"));
731        let result = exp.resolve_query(dir.path(), None);
732        assert!(result.is_err());
733        let msg = format!("{:#}", result.unwrap_err());
734        assert!(
735            msg.contains("relative") || msg.contains("absolute"),
736            "got: {msg}"
737        );
738    }
739
740    #[test]
741    fn resolve_query_file_in_subdir_is_allowed() {
742        let dir = tempfile::TempDir::new().unwrap();
743        let subdir = dir.path().join("queries");
744        std::fs::create_dir(&subdir).unwrap();
745        std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
746        let exp = make_export_direct(None, Some("queries/orders.sql"));
747        let q = exp.resolve_query(dir.path(), None).unwrap();
748        assert_eq!(q, "SELECT * FROM orders");
749    }
750}