Skip to main content

rivet/config/
export.rs

1//! Per-export configuration: query/table/mode, chunking, format, destination link.
2//!
3//! `SchemaDriftPolicy` lives here because it is only ever read via
4//! [`ExportConfig::on_schema_drift`].
5
6use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17/// What to do when structural schema drift is detected (column added, removed, or retyped).
18///
19/// ```yaml
20/// exports:
21///   - name: orders
22///     on_schema_drift: fail   # warn (default), continue, fail
23/// ```
24/// How deep `--validate` must verify each part's integrity.
25#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
26#[serde(rename_all = "snake_case")]
27pub enum VerifyMode {
28    /// Accept size-only verification when no content checksum is available.
29    #[default]
30    Size,
31    /// Require every part's content to be MD5-verified against the store's
32    /// listing; fail validation for any part that is only size-verified.
33    Content,
34}
35
36impl VerifyMode {
37    /// Whether content (not just size) verification is required.
38    pub fn requires_content(self) -> bool {
39        matches!(self, VerifyMode::Content)
40    }
41}
42
43#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
44#[serde(rename_all = "snake_case")]
45pub enum SchemaDriftPolicy {
46    /// Log a warning and continue. The new schema fingerprint is stored. (Default.)
47    #[default]
48    Warn,
49    /// Silently accept schema changes — store the new schema, no log output.
50    Continue,
51    /// Abort the run with a non-zero exit. The schema store is NOT updated so the
52    /// next run will detect the same change again.
53    Fail,
54}
55#[derive(Debug, Deserialize, JsonSchema, Clone)]
56#[serde(deny_unknown_fields)]
57pub struct ExportConfig {
58    pub name: String,
59    #[serde(default)]
60    pub query: Option<String>,
61    pub query_file: Option<String>,
62    /// Shortcut for `query: "SELECT * FROM <schema>.<table>"`.
63    ///
64    /// Accepts `table` or `schema.table` with ASCII-only identifiers
65    /// (`[A-Za-z_][A-Za-z0-9_]*`). Generates an unquoted single-table
66    /// query so the Postgres NUMERIC catalog-hint resolver recognises it
67    /// and auto-types `numeric(p,s)` columns without manual overrides.
68    ///
69    /// Mutually exclusive with `query` and `query_file`.
70    #[serde(default)]
71    pub table: Option<String>,
72    #[serde(default = "default_mode")]
73    pub mode: ExportMode,
74    pub cursor_column: Option<String>,
75    /// Secondary column for [`IncrementalCursorMode::Coalesce`] only (see ADR-0007).
76    #[serde(default)]
77    pub cursor_fallback_column: Option<String>,
78    /// How primary (and optional fallback) columns drive incremental progression.
79    #[serde(default)]
80    pub incremental_cursor_mode: IncrementalCursorMode,
81    pub chunk_column: Option<String>,
82    #[serde(default)]
83    pub chunk_dense: bool,
84    #[serde(default = "default_chunk_size")]
85    pub chunk_size: usize,
86    /// Target memory budget per chunk in MB. When set, `chunk_size` is derived
87    /// from this budget at plan-build time using a `pg_class` row-size estimate
88    /// (`pg_relation_size / reltuples`), clamped to `[10_000, 5_000_000]` rows.
89    ///
90    /// Mutually exclusive with an explicit non-default `chunk_size:`. Only
91    /// applies to `mode: chunked` on a Postgres source using the `table:`
92    /// shortcut (the row-size probe needs a known relation).
93    ///
94    /// ```yaml
95    /// exports:
96    ///   - name: page_views
97    ///     table: public.page_views
98    ///     mode: chunked
99    ///     chunk_size_memory_mb: 256
100    /// ```
101    #[serde(default)]
102    pub chunk_size_memory_mb: Option<u64>,
103    /// Divide the column range into exactly this many equal chunks.
104    /// Mutually exclusive with `chunk_dense` and `chunk_by_days`.
105    /// When set, `chunk_size` is computed dynamically from min/max.
106    pub chunk_count: Option<usize>,
107    pub chunk_by_days: Option<u32>,
108    /// Keyset (seek) pagination on this single index-backed unique key — the
109    /// source-safe shape for tables without a single-integer PK (OPT-4). The
110    /// column MUST be backed by a usable index (PK or unique); the planner
111    /// refuses a non-indexed key rather than emit a full-scan + filesort query.
112    pub chunk_by_key: Option<String>,
113    #[serde(default = "default_parallel")]
114    pub parallel: usize,
115    pub time_column: Option<String>,
116    #[serde(default = "default_time_column_type")]
117    pub time_column_type: TimeColumnType,
118    pub days_window: Option<u32>,
119    pub format: FormatType,
120    #[serde(default)]
121    pub compression: CompressionType,
122    pub compression_level: Option<u32>,
123    pub compression_profile: Option<CompressionProfile>,
124    #[serde(default)]
125    pub skip_empty: bool,
126    pub destination: DestinationConfig,
127    /// Integrity depth required of `--validate` for this export's parts.
128    /// `size` (default) accepts size-only verification; `content` requires every
129    /// part's content MD5 to be checked against the store's listing (no
130    /// download) and **fails** validation for any part that could only be
131    /// size-verified — e.g. a part too large to upload as a single PUT (raise
132    /// `max_file_size` down so it fits), or a backend that exposes no checksum.
133    #[serde(default)]
134    pub verify: VerifyMode,
135    #[serde(default)]
136    pub meta_columns: MetaColumns,
137    #[serde(default)]
138    pub quality: Option<QualityConfig>,
139    pub max_file_size: Option<String>,
140    #[serde(default)]
141    pub chunk_checkpoint: bool,
142    pub chunk_max_attempts: Option<u32>,
143    #[serde(default)]
144    pub tuning: Option<TuningConfig>,
145    /// Optional logical group for shared source capacity (replica, host). Advisory prioritization only.
146    #[serde(default)]
147    pub source_group: Option<String>,
148    /// Hint (Epic C / ADR-0006) that this export should always be treated as reconcile-heavy
149    /// by planning, independent of the `--reconcile` CLI flag. Advisory only.
150    #[serde(default)]
151    pub reconcile_required: bool,
152
153    /// Per-column type overrides (roadmap §8). Keys are column names; values
154    /// are short type strings such as `decimal(18,2)`, `timestamp_tz`, `json`.
155    ///
156    /// ```yaml
157    /// exports:
158    ///   - name: payments
159    ///     columns:
160    ///       amount: decimal(18,2)
161    ///       fee: decimal(18,6)
162    ///       created_at: timestamp_tz
163    /// ```
164    ///
165    /// Overrides take priority over autodetection and are validated at
166    /// plan time — an invalid type string fails before the export runs.
167    #[serde(default)]
168    pub columns: std::collections::HashMap<String, String>,
169
170    /// Downstream warehouse this export targets (`bigquery` / `bq`,
171    /// `duckdb`). When set, `rivet check --type-report` resolves each column
172    /// against it (native type, honest autoload type, recovery hint) without
173    /// needing `--target` on the CLI — the CLI flag still wins when both are
174    /// present. The Parquet interchange stays target-neutral (ADR-0014 T2);
175    /// `target:` only drives guidance and the future load-schema artifact.
176    ///
177    /// ```yaml
178    /// exports:
179    ///   - name: payments
180    ///     target: bigquery
181    /// ```
182    #[serde(default)]
183    pub target: Option<String>,
184
185    /// Policy applied when structural schema drift is detected (column added, removed, or retyped).
186    /// Defaults to `warn`: log a warning and continue.
187    #[serde(default)]
188    pub on_schema_drift: SchemaDriftPolicy,
189
190    /// Growth-factor threshold for data shape drift warnings (Epic 8).
191    /// When a string/binary column's max observed byte length in the current run
192    /// exceeds `stored_max * shape_drift_warn_factor`, Rivet logs a warning.
193    /// `None` uses the default of 2.0. Set to `0.0` to disable shape tracking.
194    #[serde(default)]
195    pub shape_drift_warn_factor: Option<f64>,
196
197    /// Parquet row group tuning. Only meaningful when `format: parquet`.
198    /// When absent, the parquet library default (1,048,576 rows/group) is used.
199    #[serde(default)]
200    pub parquet: Option<ParquetConfig>,
201}
202
203impl ExportConfig {
204    /// Resolve the effective `(CompressionType, level)` for this export.
205    /// `compression_profile` takes precedence over `compression` + `compression_level`.
206    pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
207        if let Some(profile) = self.compression_profile {
208            profile.to_codec()
209        } else {
210            (self.compression, self.compression_level)
211        }
212    }
213
214    pub fn max_file_size_bytes(&self) -> Option<u64> {
215        self.max_file_size
216            .as_ref()
217            .and_then(|s| parse_file_size(s).ok())
218    }
219
220    pub fn resolve_query(
221        &self,
222        config_dir: &Path,
223        params: Option<&std::collections::HashMap<String, String>>,
224    ) -> crate::error::Result<String> {
225        // table: shortcut takes precedence — already validated by
226        // `validate_business_rules` to be mutually exclusive with query/query_file.
227        if let Some(tbl) = &self.table {
228            validate_table_shortcut_ident(&self.name, tbl)?;
229            return Ok(format!("SELECT * FROM {tbl}"));
230        }
231        match (&self.query, &self.query_file) {
232            (Some(q), None) => {
233                if params.is_some() {
234                    resolve_vars(q, params)
235                } else {
236                    Ok(q.clone())
237                }
238            }
239            (None, Some(file)) => {
240                let file_path = std::path::Path::new(file);
241                // SecOps: block absolute paths and `..` traversal components.
242                if file_path.is_absolute() {
243                    anyhow::bail!(
244                        "export '{}': query_file must be a relative path: '{}'",
245                        self.name,
246                        file
247                    );
248                }
249                if file_path
250                    .components()
251                    .any(|c| c == std::path::Component::ParentDir)
252                {
253                    anyhow::bail!(
254                        "export '{}': query_file path must not contain '..': '{}'",
255                        self.name,
256                        file
257                    );
258                }
259                let joined = config_dir.join(file);
260                // Canonicalize-based check catches symlink-based evasion for files
261                // that already exist on disk.
262                if let Ok(canonical) = joined.canonicalize() {
263                    let base = config_dir
264                        .canonicalize()
265                        .unwrap_or_else(|_| config_dir.to_path_buf());
266                    if !canonical.starts_with(&base) {
267                        anyhow::bail!(
268                            "export '{}': query_file '{}' resolves outside the config directory",
269                            self.name,
270                            file
271                        );
272                    }
273                }
274                let raw = std::fs::read_to_string(&joined)?;
275                resolve_vars(&raw, params)
276            }
277            (Some(_), Some(_)) => {
278                anyhow::bail!(
279                    "export '{}': specify either 'query' or 'query_file', not both",
280                    self.name
281                )
282            }
283            (None, None) => {
284                anyhow::bail!(
285                    "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
286                    self.name
287                )
288            }
289        }
290    }
291}
292
293/// Validate the value of the `table:` YAML shortcut.
294///
295/// Accepts ASCII identifiers in the form `<table>` or `<schema>.<table>`. Each
296/// segment must match `[A-Za-z_][A-Za-z0-9_]*`. Anything else (quoted
297/// identifiers, exotic chars, three-part names, SQL injection attempts) is
298/// rejected — the user should fall back to `query:` for those cases.
299///
300/// The bound on identifier shape keeps generated SQL safe to interpolate
301/// without quoting and ensures the generated `SELECT * FROM <ident>` form is
302/// recognised by the PG catalog-hint parser ([src/source/postgres.rs]).
303fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
304    let trimmed = raw.trim();
305    if trimmed.is_empty() {
306        anyhow::bail!("export '{export_name}': 'table' is empty");
307    }
308    let parts: Vec<&str> = trimmed.split('.').collect();
309    if parts.len() > 2 {
310        anyhow::bail!(
311            "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
312        );
313    }
314    for part in &parts {
315        if part.is_empty() {
316            anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
317        }
318        let mut chars = part.chars();
319        let first = chars.next().unwrap();
320        if !(first.is_ascii_alphabetic() || first == '_') {
321            anyhow::bail!(
322                "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
323            );
324        }
325        if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
326            anyhow::bail!(
327                "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
328            );
329        }
330    }
331    Ok(())
332}
333
334#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
335#[serde(deny_unknown_fields)]
336pub struct QualityConfig {
337    pub row_count_min: Option<usize>,
338    pub row_count_max: Option<usize>,
339    #[serde(default)]
340    pub null_ratio_max: std::collections::HashMap<String, f64>,
341    #[serde(default)]
342    pub unique_columns: Vec<String>,
343    /// Cap on the number of distinct values tracked per column during uniqueness checks.
344    /// When the limit is hit, a Warn issue is emitted and tracking stops for that column.
345    /// Prevents unbounded HashSet growth on high-cardinality columns.
346    pub unique_max_entries: Option<usize>,
347}
348
349#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
350#[serde(deny_unknown_fields)]
351pub struct MetaColumns {
352    #[serde(default)]
353    pub exported_at: bool,
354    #[serde(default)]
355    pub row_hash: bool,
356}
357
358fn default_mode() -> ExportMode {
359    ExportMode::Full
360}
361
362fn default_chunk_size() -> usize {
363    100_000
364}
365
366fn default_parallel() -> usize {
367    1
368}
369
370fn default_time_column_type() -> TimeColumnType {
371    TimeColumnType::Timestamp
372}
373
374#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
375#[serde(rename_all = "snake_case")]
376pub enum ExportMode {
377    Full,
378    Incremental,
379    Chunked,
380    TimeWindow,
381}
382
383#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
384#[serde(rename_all = "lowercase")]
385pub enum TimeColumnType {
386    Timestamp,
387    Unix,
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use crate::config::{DestinationConfig, DestinationType};
394
395    // ── ExportConfig::max_file_size_bytes ───────────────────────────────────
396
397    fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
398        let yaml = format!(
399            "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n  type: local\n  path: /tmp\n{extra}"
400        );
401        serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
402    }
403
404    #[test]
405    fn max_file_size_bytes_none_when_unset() {
406        let exp = make_export_yaml("no_limit", "");
407        assert!(exp.max_file_size_bytes().is_none());
408    }
409
410    #[test]
411    fn max_file_size_bytes_parses_mb() {
412        let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
413        assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
414    }
415
416    #[test]
417    fn max_file_size_bytes_parses_gb() {
418        let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
419        assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
420    }
421
422    #[test]
423    fn max_file_size_bytes_returns_none_on_invalid() {
424        let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
425        assert!(exp.max_file_size_bytes().is_none());
426    }
427
428    // ── ExportConfig::resolve_query ─────────────────────────────────────────
429
430    // Build a minimal ExportConfig directly, bypassing Config::from_yaml validation.
431    // This lets us test the four branches inside resolve_query itself, including
432    // the (both-set / neither-set) error paths that are normally prevented by the
433    // top-level validator.
434    fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
435        ExportConfig {
436            name: "test".into(),
437            target: None,
438            verify: VerifyMode::Size,
439            query: query.map(|s| s.to_string()),
440            query_file: query_file.map(|s| s.to_string()),
441            table: None,
442            mode: ExportMode::Full,
443            cursor_column: None,
444            cursor_fallback_column: None,
445            incremental_cursor_mode: Default::default(),
446            chunk_column: None,
447            chunk_dense: false,
448            chunk_size: 100_000,
449            chunk_size_memory_mb: None,
450            chunk_count: None,
451            chunk_by_days: None,
452            chunk_by_key: None,
453            parallel: 1,
454            time_column: None,
455            time_column_type: TimeColumnType::Timestamp,
456            days_window: None,
457            format: FormatType::Parquet,
458            compression: CompressionType::None,
459            compression_level: None,
460            compression_profile: None,
461            skip_empty: false,
462            destination: DestinationConfig {
463                destination_type: DestinationType::Local,
464                path: Some("/tmp".into()),
465                ..Default::default()
466            },
467            meta_columns: MetaColumns::default(),
468            quality: None,
469            max_file_size: None,
470            chunk_checkpoint: false,
471            chunk_max_attempts: None,
472            tuning: None,
473            source_group: None,
474            reconcile_required: false,
475            columns: Default::default(),
476            on_schema_drift: Default::default(),
477            shape_drift_warn_factor: None,
478            parquet: None,
479        }
480    }
481
482    fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
483        pairs
484            .iter()
485            .map(|(k, v)| (k.to_string(), v.to_string()))
486            .collect()
487    }
488
489    #[test]
490    fn resolve_query_inline_no_params_returns_query_as_is() {
491        let exp = make_export_direct(Some("SELECT id FROM orders"), None);
492        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
493        assert_eq!(q, "SELECT id FROM orders");
494    }
495
496    #[test]
497    fn resolve_query_inline_with_params_substitutes_vars() {
498        let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
499        let p = params(&[("col", "id"), ("table", "orders")]);
500        let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
501        assert_eq!(q, "SELECT id FROM orders");
502    }
503
504    #[test]
505    fn resolve_query_inline_params_empty_map_is_noop() {
506        let exp = make_export_direct(Some("SELECT 1"), None);
507        let p = params(&[]);
508        let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
509        assert_eq!(q, "SELECT 1");
510    }
511
512    #[test]
513    fn resolve_query_inline_missing_var_returns_error() {
514        // SAFETY: test-only; this binary is single-threaded in the test runner context.
515        unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
516        let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
517        let p = params(&[]);
518        let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
519        assert!(result.is_err());
520        let msg = format!("{:#}", result.unwrap_err());
521        assert!(
522            msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
523            "got: {msg}"
524        );
525    }
526
527    #[test]
528    fn resolve_query_file_reads_content() {
529        let dir = tempfile::TempDir::new().unwrap();
530        let sql_path = dir.path().join("query.sql");
531        std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
532        let exp = make_export_direct(None, Some("query.sql"));
533        let q = exp.resolve_query(dir.path(), None).unwrap();
534        assert_eq!(q, "SELECT * FROM customers");
535    }
536
537    #[test]
538    fn resolve_query_file_with_params_substitutes() {
539        let dir = tempfile::TempDir::new().unwrap();
540        let sql_path = dir.path().join("q.sql");
541        std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
542        let exp = make_export_direct(None, Some("q.sql"));
543        let p = params(&[("col", "name"), ("tbl", "users")]);
544        let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
545        assert_eq!(q, "SELECT name FROM users");
546    }
547
548    // ── `table:` shortcut ───────────────────────────────────────────────────
549
550    #[test]
551    fn resolve_query_table_shortcut_qualified() {
552        let mut exp = make_export_direct(None, None);
553        exp.table = Some("public.users".into());
554        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
555        assert_eq!(q, "SELECT * FROM public.users");
556    }
557
558    #[test]
559    fn resolve_query_table_shortcut_unqualified() {
560        let mut exp = make_export_direct(None, None);
561        exp.table = Some("orders".into());
562        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
563        assert_eq!(q, "SELECT * FROM orders");
564    }
565
566    #[test]
567    fn resolve_query_table_shortcut_rejects_three_part_name() {
568        let mut exp = make_export_direct(None, None);
569        exp.table = Some("db.public.users".into());
570        let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
571        let msg = format!("{err:#}");
572        assert!(msg.contains("<schema>.<name>"), "got: {msg}");
573    }
574
575    #[test]
576    fn resolve_query_table_shortcut_rejects_sql_injection() {
577        for bad in [
578            "users; DROP TABLE x",
579            "users--",
580            "users'",
581            "users\"",
582            "public.\"My Table\"",
583            "0starts_with_digit",
584            "",
585            ".trailing",
586            "leading.",
587            "two..dots",
588        ] {
589            let mut exp = make_export_direct(None, None);
590            exp.table = Some(bad.into());
591            assert!(
592                exp.resolve_query(Path::new("/tmp"), None).is_err(),
593                "should reject `table:` value '{bad}'",
594            );
595        }
596    }
597
598    #[test]
599    fn resolve_query_table_shortcut_takes_precedence_over_query() {
600        let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
601        exp.table = Some("public.y".into());
602        let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
603        assert_eq!(q, "SELECT * FROM public.y");
604    }
605
606    #[test]
607    fn resolve_query_file_missing_returns_error() {
608        let dir = tempfile::TempDir::new().unwrap();
609        let exp = make_export_direct(None, Some("nonexistent.sql"));
610        let result = exp.resolve_query(dir.path(), None);
611        assert!(result.is_err());
612        let msg = format!("{:#}", result.unwrap_err());
613        assert!(
614            msg.contains("nonexistent.sql") || msg.contains("No such file"),
615            "got: {msg}"
616        );
617    }
618
619    #[test]
620    fn resolve_query_both_set_returns_error() {
621        let mut exp = make_export_direct(Some("SELECT 1"), None);
622        exp.query_file = Some("file.sql".into());
623        let result = exp.resolve_query(Path::new("/tmp"), None);
624        assert!(result.is_err());
625        let msg = format!("{:#}", result.unwrap_err());
626        assert!(
627            msg.contains("not both") || msg.contains("query_file"),
628            "got: {msg}"
629        );
630    }
631
632    #[test]
633    fn resolve_query_neither_set_returns_error() {
634        let exp = make_export_direct(None, None);
635        let result = exp.resolve_query(Path::new("/tmp"), None);
636        assert!(result.is_err());
637        let msg = format!("{:#}", result.unwrap_err());
638        assert!(
639            msg.contains("query") || msg.contains("query_file"),
640            "got: {msg}"
641        );
642    }
643
644    // ── SecOps: query_file path traversal prevention ──────────────────────────
645
646    #[test]
647    fn resolve_query_file_dotdot_is_rejected() {
648        let dir = tempfile::TempDir::new().unwrap();
649        let exp = make_export_direct(None, Some("../secret.sql"));
650        let result = exp.resolve_query(dir.path(), None);
651        assert!(result.is_err());
652        let msg = format!("{:#}", result.unwrap_err());
653        assert!(
654            msg.contains("..") || msg.contains("traversal"),
655            "got: {msg}"
656        );
657    }
658
659    #[test]
660    fn resolve_query_file_nested_dotdot_is_rejected() {
661        let dir = tempfile::TempDir::new().unwrap();
662        let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
663        let result = exp.resolve_query(dir.path(), None);
664        assert!(result.is_err());
665        let msg = format!("{:#}", result.unwrap_err());
666        assert!(
667            msg.contains("..") || msg.contains("traversal"),
668            "got: {msg}"
669        );
670    }
671
672    #[test]
673    fn resolve_query_file_absolute_path_is_rejected() {
674        let dir = tempfile::TempDir::new().unwrap();
675        let exp = make_export_direct(None, Some("/etc/passwd"));
676        let result = exp.resolve_query(dir.path(), None);
677        assert!(result.is_err());
678        let msg = format!("{:#}", result.unwrap_err());
679        assert!(
680            msg.contains("relative") || msg.contains("absolute"),
681            "got: {msg}"
682        );
683    }
684
685    #[test]
686    fn resolve_query_file_in_subdir_is_allowed() {
687        let dir = tempfile::TempDir::new().unwrap();
688        let subdir = dir.path().join("queries");
689        std::fs::create_dir(&subdir).unwrap();
690        std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
691        let exp = make_export_direct(None, Some("queries/orders.sql"));
692        let q = exp.resolve_query(dir.path(), None).unwrap();
693        assert_eq!(q, "SELECT * FROM orders");
694    }
695}