Skip to main content

rivet/config/
mod.rs

1pub mod cursor;
2mod destination;
3mod export;
4mod format;
5mod lints;
6mod notifications;
7pub mod resolve;
8pub mod schema;
9mod source;
10
11pub use cursor::IncrementalCursorMode;
12pub use destination::*;
13pub use export::*;
14pub use format::*;
15pub use notifications::*;
16#[allow(unused_imports)]
17pub(crate) use resolve::resolve_env_vars;
18pub use resolve::{parse_file_size, resolve_vars};
19pub use schema::generate_config_schema_pretty;
20pub use source::*;
21
22use schemars::JsonSchema;
23use serde::Deserialize;
24
25/// Top-level Rivet configuration root.
26///
27/// Operators write this struct as YAML (typically `rivet.yaml`).  The
28/// `JsonSchema` derive is the source of truth for the `schemas/rivet.schema.json`
29/// artifact and the `rivet schema config` command's output (v0.7.3 P0).
30#[derive(Debug, Deserialize, JsonSchema, Clone)]
31#[serde(deny_unknown_fields)]
32pub struct Config {
33    pub source: SourceConfig,
34    pub exports: Vec<ExportConfig>,
35    #[serde(default)]
36    pub notifications: Option<NotificationsConfig>,
37    #[serde(default)]
38    pub parallel_exports: bool,
39    #[serde(default)]
40    pub parallel_export_processes: bool,
41}
42
43impl Config {
44    pub fn load(path: &str) -> crate::error::Result<Self> {
45        Self::load_with_params(path, None)
46    }
47
48    pub fn load_with_params(
49        path: &str,
50        params: Option<&std::collections::HashMap<String, String>>,
51    ) -> crate::error::Result<Self> {
52        // F11 (0.7.5 audit): raw `std::io::Error` lost the path on
53        // not-found.  Wrap with the file path + a hint so the operator
54        // can see *which* config the tool could not open.
55        let contents = std::fs::read_to_string(path).map_err(|e| {
56            if e.kind() == std::io::ErrorKind::NotFound {
57                anyhow::anyhow!(
58                    "config file '{}' not found.\n  Hint: check the path, or run `rivet init` to generate one.",
59                    path
60                )
61            } else {
62                anyhow::anyhow!("cannot read config file '{}': {}", path, e)
63            }
64        })?;
65        // Warn about typo'd `--param` keys once per CLI invocation, using the
66        // un-resolved YAML as the haystack so the placeholders are still there.
67        // We pass the raw `contents` (not `resolved`) on purpose: after
68        // resolution the placeholders are gone, and every key would look unused.
69        resolve::warn_unused_params(&contents, params);
70        let resolved = resolve_vars(&contents, params)?;
71        // F12 (0.7.5 audit): YAML parse errors did not name the config
72        // file.  When loading from disk we know the path — thread it
73        // into the parse error.
74        Self::from_yaml(&resolved).map_err(|e| anyhow::anyhow!("config file '{}': {:#}", path, e))
75    }
76
77    pub fn from_yaml(yaml: &str) -> crate::error::Result<Self> {
78        // Intercept the unquoted-`{partition}` footgun before the raw-YAML
79        // pre-scans below (they `from_str::<Value>` too and would re-emit the
80        // same cryptic libyaml message). A document that does not even parse as
81        // a `Value` is malformed; if the failure looks like a flow-mapping
82        // scanner error *and* the source carries an unquoted brace value, point
83        // straight at the quoting fix. On a valid config this `Value` parse
84        // succeeds and the block is skipped, so the success path is unchanged.
85        if let Err(e) = serde_yaml_ng::from_str::<serde_yaml_ng::Value>(yaml) {
86            let m = e.to_string();
87            if let Some(hint) = Self::unquoted_template_brace_hint(yaml, &m) {
88                return Err(anyhow::anyhow!("{e}\n  {hint}"));
89            }
90            // A TAB in indentation is the most common beginner YAML mistake; it
91            // trips libyaml before serde ever sees a field, so it must be caught
92            // here in the raw scan, not in `enhance_parse_error`.
93            if let Some(hint) = Self::tab_indent_hint(yaml, &m) {
94                return Err(anyhow::anyhow!("{e}\n  {hint}"));
95            }
96        }
97        Self::check_misplaced_tuning_fields(yaml)?;
98        Self::check_csv_compression(yaml)?;
99        Self::check_tls_mode_downgrade(yaml)?;
100        let config: Config = serde_yaml_ng::from_str(yaml).map_err(|e| {
101            // A well-formed flow map (`prefix: {partition}`) parses as a YAML
102            // value but serde then rejects it with `invalid type: map, expected
103            // a string`. That is the same unquoted-brace footgun, surfacing one
104            // layer later than the scanner errors caught above — so try the same
105            // hint here before falling back to the generic field-typo enhancer.
106            if let Some(hint) = Self::unquoted_template_brace_hint(yaml, &e.to_string()) {
107                anyhow::anyhow!("{e}\n  {hint}")
108            } else {
109                lints::enhance_parse_error(e)
110            }
111        })?;
112        config.validate()?;
113        Ok(config)
114    }
115
116    /// Detect the unquoted-`{partition}` (or `{date}`, …) template footgun and
117    /// return an actionable quoting hint, or `None` when the error is unrelated.
118    ///
119    /// A YAML value that *starts* a flow mapping — `prefix: {partition}` — is
120    /// the common copy-paste mistake: `{partition}` is the required token for
121    /// `partition_by`, but unquoted it parses as a YAML map (or, with trailing
122    /// text, trips the libyaml scanner). serde then emits a cryptic
123    /// `did not find expected ',' or '}'` / `while parsing a flow mapping` /
124    /// `invalid type: map, expected a string` with no hint that a pair of
125    /// quotes is the fix.
126    ///
127    /// Two guards keep this from firing on unrelated parse errors: the error
128    /// message must carry one of the flow-mapping symptoms, AND the raw source
129    /// must actually contain an unquoted `{…}` value. Both must hold, so a
130    /// valid config (every brace value quoted) never sees the hint, and a
131    /// genuine map-typed field error elsewhere is left alone.
132    /// A YAML document indented with a TAB trips libyaml with `found character
133    /// that cannot start any token`. Point straight at the fix (spaces, not
134    /// tabs) — but only when the cited line really begins with a tab, so an
135    /// unrelated scanner error is left with its original message.
136    fn tab_indent_hint(yaml: &str, err_msg: &str) -> Option<String> {
137        if !err_msg.contains("tab character") {
138            return None;
139        }
140        // serde_yaml_ng reports `found a tab character … at line N column C …`;
141        // the FIRST `line N` is where the offending tab is.
142        let line_no: usize = err_msg
143            .split_once("line ")
144            .and_then(|(_, rest)| rest.split([' ', ',']).next())
145            .and_then(|n| n.parse().ok())?;
146        let line = yaml.lines().nth(line_no.checked_sub(1)?)?;
147        let leading = &line[..line.len() - line.trim_start().len()];
148        leading.contains('\t').then(|| {
149            format!(
150                "line {line_no} is indented with a TAB — YAML requires spaces. Replace the tab(s) with spaces."
151            )
152        })
153    }
154
155    fn unquoted_template_brace_hint(yaml: &str, err_msg: &str) -> Option<String> {
156        const FLOW_SYMPTOMS: &[&str] = &[
157            "did not find expected ',' or '}'",
158            "while parsing a flow mapping",
159            // A bare `key: {token}` parses as a map, then serde rejects the
160            // map where it wanted a scalar — same root cause, later layer.
161            "invalid type: map, expected a string",
162            // `key: {token}/more` runs the flow map into block context.
163            "did not find expected key",
164        ];
165        if !FLOW_SYMPTOMS.iter().any(|s| err_msg.contains(s)) {
166            return None;
167        }
168        if !yaml.lines().any(line_has_unquoted_brace_value) {
169            return None;
170        }
171        Some(
172            "a YAML value containing { } (such as {partition} or {date}) must be quoted, \
173             e.g. prefix: \"exports/{partition}/\""
174                .to_string(),
175        )
176    }
177
178    /// Reject `format: csv` paired with an explicitly-requested compression
179    /// codec (Finding #10). The CSV writer has no compression encoder, so the
180    /// codec is silently dropped on write while the run manifest still records
181    /// it — a degraded, dishonest no-op. We reject loudly at config-validate
182    /// time so `rivet check` / `rivet doctor` catch it before any run.
183    ///
184    /// This is a raw-YAML scan (like [`Self::check_misplaced_tuning_fields`])
185    /// rather than a `validate_export` check on purpose: `ExportConfig.
186    /// compression` is `#[serde(default)]` and `CompressionType::default()` is
187    /// `Zstd`, so a parsed export cannot distinguish "user asked for zstd" from
188    /// "user omitted the field". Only a user who *wrote* `compression:`/
189    /// `compression_profile:` is asking for something the CSV writer cannot
190    /// honour; the bare-`format: csv` default writes uncompressed and is fine.
191    fn check_csv_compression(yaml: &str) -> crate::error::Result<()> {
192        let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;
193        let Some(exports) = root.get("exports").and_then(|e| e.as_sequence()) else {
194            return Ok(());
195        };
196        for export in exports {
197            if export.get("format").and_then(|f| f.as_str()) != Some("csv") {
198                continue;
199            }
200            let name = export
201                .get("name")
202                .and_then(|n| n.as_str())
203                .unwrap_or("<unnamed>");
204
205            // Explicit `compression:` codec that the CSV writer cannot apply.
206            // An unrecognised label is left for serde to reject during the real
207            // parse; we only act on a codec we understand and that CSV cannot
208            // honour (everything but `none`).
209            if let Some(codec) = export.get("compression").and_then(|c| c.as_str())
210                && let Some(ct) = CompressionType::from_label(codec)
211                && !format::compression_supported(FormatType::Csv, ct)
212            {
213                anyhow::bail!(
214                    "export '{}': CSV output does not support compression: {}. \
215                     CSV has no compression encoder, so the codec would be silently dropped \
216                     while the manifest records it.\n  \
217                     Hint: use `format: parquet` for compression, or set `compression: none`.",
218                    name,
219                    codec,
220                );
221            }
222
223            // A `compression_profile:` other than `none` resolves to a real
224            // codec too (fast→snappy, balanced/compact→zstd) — same no-op.
225            if let Some(profile) = export.get("compression_profile").and_then(|c| c.as_str())
226                && profile != CompressionProfile::None.label()
227            {
228                anyhow::bail!(
229                    "export '{}': CSV output does not support compression_profile: {} \
230                     (it resolves to a compression codec the CSV writer cannot apply).\n  \
231                     Hint: use `format: parquet` for compression, or set `compression_profile: none`.",
232                    name,
233                    profile,
234                );
235            }
236        }
237        Ok(())
238    }
239
240    /// V13: reject a `source.tls` block that pairs an *explicitly chosen*
241    /// enforced `mode:` with a verification-disabling danger knob
242    /// (`accept_invalid_certs` / `accept_invalid_hostnames`). `mode: verify-full`
243    /// promises chain + hostname verification, but the knob silently downgrades
244    /// it to "trust anything" — a MITM exposure that contradicts the stated
245    /// intent (see `src/source/tls.rs::build_native_tls`, whose comment claims
246    /// this is warned about at config-time but is not).
247    ///
248    /// Like [`Self::check_csv_compression`], this is a raw-YAML scan rather than
249    /// a `validate` check on purpose: `TlsMode` is `#[serde(default)]` and the
250    /// default is `VerifyFull`, so a parsed config cannot distinguish "user
251    /// wrote `mode: verify-full`" (a contradiction to flag) from "user omitted
252    /// `mode:`" (the common dev-container case `tls: { accept_invalid_certs:
253    /// true }` against a loopback self-signed cert — which must keep working).
254    /// Only an *explicit* enforced `mode:` next to a danger knob is the footgun.
255    fn check_tls_mode_downgrade(yaml: &str) -> crate::error::Result<()> {
256        let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;
257        let Some(tls) = root.get("source").and_then(|s| s.get("tls")) else {
258            return Ok(());
259        };
260
261        // Only an explicitly written `mode:` is a deliberate, contradicted
262        // choice; an omitted mode is the dev-container default path.
263        let Some(mode) = tls.get("mode").and_then(|m| m.as_str()) else {
264            return Ok(());
265        };
266        // `disable` carries no verification promise to contradict; the danger
267        // knobs are a no-op there. Flag only the enforced modes.
268        if mode == "disable" {
269            return Ok(());
270        }
271
272        let knob = if tls
273            .get("accept_invalid_certs")
274            .and_then(|v| v.as_bool())
275            .unwrap_or(false)
276        {
277            Some("accept_invalid_certs")
278        } else if tls
279            .get("accept_invalid_hostnames")
280            .and_then(|v| v.as_bool())
281            .unwrap_or(false)
282        {
283            Some("accept_invalid_hostnames")
284        } else {
285            None
286        };
287
288        if let Some(knob) = knob {
289            anyhow::bail!(
290                "source.tls: {} disables certificate verification, silently downgrading the \
291                 chosen `mode: {}` to trust-anything (MITM exposure — credentials and rows \
292                 readable/forgeable on the wire).\n  \
293                 Hint: drop the danger knob and trust a private CA with `tls.ca_file: <pem>`; \
294                 only use a danger knob for a loopback self-signed dev container, and then omit \
295                 the explicit `mode:` so the contradiction is gone.",
296                knob,
297                mode,
298            );
299        }
300        Ok(())
301    }
302
303    /// Detect tuning-related fields placed directly under `source:` or an
304    /// `exports[]` entry instead of inside the `tuning:` sub-key. Without this
305    /// check serde silently ignores unknown keys and the user gets unexpected
306    /// defaults (e.g. batch_size=10 000 instead of the intended 1 000).
307    fn check_misplaced_tuning_fields(yaml: &str) -> crate::error::Result<()> {
308        const TUNING_FIELDS: &[&str] = &[
309            "batch_size",
310            "batch_size_memory_mb",
311            "throttle_ms",
312            "statement_timeout_s",
313            "max_retries",
314            "retry_backoff_ms",
315            "lock_timeout_s",
316            "memory_threshold_mb",
317            "profile",
318        ];
319
320        let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;
321
322        if let Some(source) = root.get("source") {
323            let misplaced: Vec<&str> = TUNING_FIELDS
324                .iter()
325                .copied()
326                .filter(|&f| source.get(f).is_some())
327                .collect();
328            if !misplaced.is_empty() {
329                anyhow::bail!(
330                    "source: field(s) [{}] belong under 'source.tuning:', not directly under 'source:'. \
331                     Example:\n  source:\n    tuning:\n      {}: <value>",
332                    misplaced.join(", "),
333                    misplaced[0],
334                );
335            }
336        }
337
338        if let Some(exports) = root.get("exports").and_then(|e| e.as_sequence()) {
339            for (i, export) in exports.iter().enumerate() {
340                let name = export
341                    .get("name")
342                    .and_then(|n| n.as_str())
343                    .unwrap_or("<unnamed>");
344                let misplaced: Vec<&str> = TUNING_FIELDS
345                    .iter()
346                    .copied()
347                    .filter(|&f| export.get(f).is_some())
348                    .collect();
349                if !misplaced.is_empty() {
350                    anyhow::bail!(
351                        "export '{}' (index {}): field(s) [{}] belong under 'exports[].tuning:', \
352                         not directly in the export. Example:\n  exports:\n    - name: {}\n      tuning:\n        {}: <value>",
353                        name,
354                        i,
355                        misplaced.join(", "),
356                        name,
357                        misplaced[0],
358                    );
359                }
360            }
361        }
362
363        Ok(())
364    }
365
366    /// Reject a config before any plan/connect step. The body is split into
367    /// three cohesive validators so each can be read — and unit-tested — on its
368    /// own: the export-list shape, the source connection block, and the
369    /// per-export rules. The end-to-end surface (`Config::from_yaml`) is
370    /// covered by `config/tests/{validation,secops}.rs`; the split additionally
371    /// lets a rule be exercised directly via `validate_export`.
372    fn validate(&self) -> crate::error::Result<()> {
373        self.validate_exports_list()?;
374        self.validate_source_connection()?;
375        for export in &self.exports {
376            self.validate_export(export)?;
377        }
378        Ok(())
379    }
380
381    /// Whole-config shape: at least one export, names unique.
382    fn validate_exports_list(&self) -> crate::error::Result<()> {
383        // An empty `exports:` list is almost always a typo (wrong config file,
384        // dropped anchor, merged doc with the anchor section missing). Running
385        // with zero exports is a silent no-op that looks like success in CI;
386        // reject fast instead. See QA backlog Task 5.1.
387        if self.exports.is_empty() {
388            anyhow::bail!("exports: at least one export must be defined (got empty list)");
389        }
390
391        // Duplicate export names break state tracking: `export_state`,
392        // `file_log`, and `chunk_run` are all keyed by `export_name`, so
393        // two configs with the same name silently share cursor/file-log rows.
394        // QA backlog Task 5.1.
395        let mut seen: std::collections::HashSet<&str> =
396            std::collections::HashSet::with_capacity(self.exports.len());
397        for e in &self.exports {
398            if !seen.insert(e.name.as_str()) {
399                anyhow::bail!(
400                    "exports: duplicate export name '{}' (each export must have a unique name; state is keyed by name)",
401                    e.name
402                );
403            }
404        }
405        Ok(())
406    }
407
408    /// Source connection block: exactly one connection method, well-formed,
409    /// and the source-level tuning that is shared by every export.
410    fn validate_source_connection(&self) -> crate::error::Result<()> {
411        if let Some(t) = &self.source.tuning
412            && t.batch_size.is_some()
413            && t.batch_size_memory_mb.is_some()
414        {
415            anyhow::bail!(
416                "tuning: batch_size and batch_size_memory_mb are mutually exclusive. \
417                 Prefer batch_size_memory_mb (rivet sizes the batch to a memory budget, \
418                 adapting to row width); set batch_size only to pin an exact row count."
419            );
420        }
421
422        if !self.source.has_url_fields() && !self.source.has_structured_fields() {
423            // First-run footgun: a config that forgot the source block
424            // entirely.  Show the recommended path (`url_env`) up-front;
425            // operators who actually want structured fields know to look
426            // for them.
427            anyhow::bail!(
428                "source: no connection method configured. Add one of:\n  url_env: DATABASE_URL                          (URL from env var — recommended)\n  url: 'postgresql://user:pass@host:5432/db'      (inline — not recommended for committed configs)\n  url_file: /etc/rivet/source.url                 (URL from file — rotation-friendly)\n  host/user/database/...                          (structured fields under `source:`)"
429            );
430        }
431
432        if self.source.has_url_fields() {
433            let url_count = [
434                &self.source.url,
435                &self.source.url_env,
436                &self.source.url_file,
437            ]
438            .iter()
439            .filter(|u| u.is_some())
440            .count();
441            if url_count > 1 {
442                anyhow::bail!(
443                    "source: specify exactly one of 'url', 'url_env', or 'url_file' (got {} set).\n  Hint: pick one — `url_env` is recommended so credentials never enter the YAML.",
444                    url_count
445                );
446            }
447        }
448
449        if self.source.has_url_fields() && self.source.has_structured_fields() {
450            anyhow::bail!(
451                "source: pick either URL-based config (url/url_env/url_file) OR structured fields (host/user/database/port/password_env), not both.\n  Hint: remove whichever block you don't want; mixing the two is ambiguous."
452            );
453        }
454
455        if self.source.has_structured_fields() {
456            if self.source.host.is_none() {
457                anyhow::bail!(
458                    "source: structured config is missing 'host'.\n  Hint: add `host: localhost` (or your DB host) under `source:` in rivet.yaml.\n  Or switch to URL-based config: `url_env: DATABASE_URL`."
459                );
460            }
461            if self.source.user.is_none() {
462                anyhow::bail!(
463                    "source: structured config is missing 'user'.\n  Hint: add `user: <username>` under `source:` in rivet.yaml."
464                );
465            }
466            if self.source.database.is_none() {
467                anyhow::bail!(
468                    "source: structured config is missing 'database'.\n  Hint: add `database: <dbname>` under `source:` in rivet.yaml."
469                );
470            }
471            if self.source.password.is_some() && self.source.password_env.is_some() {
472                anyhow::bail!(
473                    "source: specify 'password' OR 'password_env', not both.\n  Hint: prefer `password_env: DB_PASSWORD` so credentials never enter the YAML."
474                );
475            }
476        }
477        Ok(())
478    }
479
480    /// Per-export rules: effective tuning, query source, `query_file` SecOps,
481    /// destination auth, compression, and the mode/chunk matrix. Takes `&self`
482    /// because effective tuning merges the source-level block.
483    fn validate_export(&self, export: &ExportConfig) -> crate::error::Result<()> {
484        // V5: `name` is keyed into output paths, file logs, and on-disk state,
485        // yet is otherwise free-form. A traversal (`../../etc/x`), absolute or
486        // slash-bearing (`/abs/x`, `sub/dir`), leading-dot, or NUL-bearing name
487        // escapes the intended output tree and corrupts name-keyed state.
488        // Mirror the `query_file` `..`/absolute guard: reject at config-load,
489        // accepting only a filename-safe charset.
490        if !is_filename_safe_name(&export.name) {
491            anyhow::bail!(
492                "export name '{}' is not filename-safe: it must not be absolute, contain \
493                 '/', '\\', '..', a NUL, or start with '.' (the name is used in output paths \
494                 and state keys). Use a plain identifier like `orders` or `daily_events`.",
495                export.name.escape_default(),
496            );
497        }
498
499        let merged =
500            crate::tuning::merge_tuning_config(self.source.tuning.as_ref(), export.tuning.as_ref());
501        if let Some(t) = merged
502            && t.batch_size.is_some()
503            && t.batch_size_memory_mb.is_some()
504        {
505            anyhow::bail!(
506                "export '{}': effective tuning has both batch_size and batch_size_memory_mb (mutually exclusive)",
507                export.name
508            );
509        }
510        if let Some(et) = &export.tuning
511            && et.batch_size.is_some()
512            && et.batch_size_memory_mb.is_some()
513        {
514            anyhow::bail!(
515                "export '{}': tuning.batch_size and tuning.batch_size_memory_mb are mutually exclusive",
516                export.name
517            );
518        }
519
520        let set_count = [
521            export.query.is_some(),
522            export.query_file.is_some(),
523            export.table.is_some(),
524        ]
525        .iter()
526        .filter(|b| **b)
527        .count();
528        if set_count == 0 {
529            anyhow::bail!(
530                "export '{}': specify exactly one of 'query', 'query_file', or 'table'. \
531                 Use table: <name> for a whole table (enables PK auto-chunking); \
532                 query: \"SELECT …\" for an inline one-liner; \
533                 query_file: <path> for SQL you keep in version control.",
534                export.name
535            );
536        }
537        if set_count > 1 {
538            anyhow::bail!(
539                "export '{}': specify exactly one of 'query', 'query_file', or 'table' (got {} set)",
540                export.name,
541                set_count
542            );
543        }
544        // SecOps: syntactic `query_file` checks must run at config-validate
545        // time so `rivet check` / `rivet doctor` catch them before any
546        // plan step. The same checks repeat (with a canonicalize-based
547        // symlink probe) in `ExportConfig::resolve_query` because the
548        // file may have been swapped between validation and read.
549        if let Some(file) = &export.query_file {
550            let p = std::path::Path::new(file);
551            if p.is_absolute() {
552                anyhow::bail!(
553                    "export '{}': query_file must be a relative path: '{}'",
554                    export.name,
555                    file
556                );
557            }
558            if p.components().any(|c| c == std::path::Component::ParentDir) {
559                anyhow::bail!(
560                    "export '{}': query_file path must not contain '..': '{}'",
561                    export.name,
562                    file
563                );
564            }
565        }
566        // V2/V12: a custom cloud `endpoint` is handed straight to the opendal
567        // S3/GCS/Azure builder with no validation, so a committed config can
568        // silently redirect every upload to an attacker host (exfiltration) or
569        // send credentials + rows over cleartext `http://`. The legitimate use
570        // is a local emulator (Minio / Azurite / fake-gcs on `127.0.0.1`), so
571        // accept a loopback host (any scheme), and otherwise accept a remote
572        // endpoint only when the operator has explicitly opted into anonymous
573        // (emulator) mode. Reject every other custom endpoint at config-load.
574        if matches!(
575            export.destination.destination_type,
576            DestinationType::S3 | DestinationType::Gcs | DestinationType::Azure
577        ) && let Some(endpoint) = &export.destination.endpoint
578        {
579            // Loopback emulator (Minio/Azurite/fake-gcs) is the legitimate
580            // local-dev path — accept any scheme. A non-loopback (or
581            // unparseable) custom endpoint is only accepted when the operator
582            // has explicitly opted into anonymous (emulator) mode, where no
583            // credentials are sent. Everything else is rejected.
584            let loopback = endpoint_host(endpoint).is_some_and(|host| is_loopback_host(&host));
585            if !loopback && !export.destination.allow_anonymous {
586                anyhow::bail!(
587                    "export '{}': destination.endpoint '{}' points at a non-loopback host. \
588                     A custom endpoint redirects every upload there — committing one is a \
589                     data-exfiltration / cleartext-credential risk.\n  \
590                     Hint: drop `endpoint:` to use the provider default, point it at a \
591                     loopback emulator (e.g. http://127.0.0.1:9000 with allow_anonymous: true \
592                     for Minio/Azurite), or set `allow_anonymous: true` for an anonymous \
593                     emulator.",
594                    export.name,
595                    endpoint,
596                );
597            }
598        }
599
600        // V15: a `type: local` destination `path` (or `prefix`) is written
601        // verbatim to the filesystem. A `..` component lets a committed config
602        // climb out of the intended output tree (`../../../../tmp/x`) — mirror
603        // the `query_file` traversal guard and reject it at config-load.
604        //
605        // Absolute paths are deliberately *not* rejected: `path: /output` is a
606        // legitimate Docker volume-mount pattern (see `examples/rivet.yaml`) and
607        // an explicit operator choice, not a hidden escape. The `..` climb is
608        // the unambiguous traversal footgun.
609        if export.destination.destination_type == DestinationType::Local {
610            for (field, value) in [
611                ("path", export.destination.path.as_deref()),
612                ("prefix", export.destination.prefix.as_deref()),
613            ] {
614                let Some(value) = value else { continue };
615                if std::path::Path::new(value)
616                    .components()
617                    .any(|c| c == std::path::Component::ParentDir)
618                {
619                    anyhow::bail!(
620                        "export '{}': local destination {} must not contain a '..' component: \
621                         '{}' (a parent-dir climb writes outside the output tree).",
622                        export.name,
623                        field,
624                        value
625                    );
626                }
627            }
628        }
629
630        if export.destination.destination_type == DestinationType::S3 {
631            let ak = export.destination.access_key_env.is_some();
632            let sk = export.destination.secret_key_env.is_some();
633            if ak != sk {
634                anyhow::bail!(
635                    "export '{}': S3 requires both access_key_env and secret_key_env, or neither (use default AWS credential chain)",
636                    export.name
637                );
638            }
639        }
640
641        if export.destination.destination_type == DestinationType::Gcs
642            && export.destination.allow_anonymous
643            && export.destination.credentials_file.is_some()
644        {
645            anyhow::bail!(
646                "export '{}': GCS allow_anonymous cannot be used together with credentials_file",
647                export.name
648            );
649        }
650
651        if export.destination.destination_type == DestinationType::Azure {
652            let has_name = export.destination.account_name.is_some();
653            let has_key = export.destination.account_key_env.is_some();
654            let has_sas = export.destination.sas_token_env.is_some();
655            if export.destination.allow_anonymous {
656                if has_name || has_key || has_sas {
657                    anyhow::bail!(
658                        "export '{}': Azure allow_anonymous cannot be combined with account_name/account_key_env/sas_token_env",
659                        export.name
660                    );
661                }
662            } else if has_key && has_sas {
663                anyhow::bail!(
664                    "export '{}': Azure account_key_env and sas_token_env are mutually exclusive — pick one auth mode",
665                    export.name
666                );
667            } else if !has_name {
668                anyhow::bail!(
669                    "export '{}': Azure requires account_name (plus account_key_env or sas_token_env), or allow_anonymous: true for Azurite",
670                    export.name
671                );
672            } else if !has_key && !has_sas {
673                anyhow::bail!(
674                    "export '{}': Azure requires account_key_env or sas_token_env (or allow_anonymous: true for Azurite)",
675                    export.name
676                );
677            }
678        }
679
680        if let Some(cred_path) = &export.destination.credentials_file
681            && !std::path::Path::new(cred_path).exists()
682        {
683            anyhow::bail!(
684                "export '{}': credentials_file '{}' does not exist",
685                export.name,
686                cred_path
687            );
688        }
689
690        if let Some(ref size_str) = export.max_file_size {
691            parse_file_size(size_str).map_err(|_| {
692                anyhow::anyhow!(
693                    "export '{}': invalid max_file_size '{}'",
694                    export.name,
695                    size_str
696                )
697            })?;
698        }
699
700        if let Some(level) = export.compression_level {
701            match export.compression {
702                CompressionType::Zstd => {
703                    if !(1..=22).contains(&level) {
704                        anyhow::bail!(
705                            "export '{}': zstd compression_level must be 1..22, got {}",
706                            export.name,
707                            level
708                        );
709                    }
710                }
711                CompressionType::Gzip => {
712                    if level > 10 {
713                        anyhow::bail!(
714                            "export '{}': gzip compression_level must be 0..10, got {}",
715                            export.name,
716                            level
717                        );
718                    }
719                }
720                _ => {
721                    anyhow::bail!(
722                        "export '{}': compression_level is only supported for zstd and gzip",
723                        export.name
724                    );
725                }
726            }
727        }
728
729        match export.mode {
730            ExportMode::Incremental => {
731                if export.cursor_column.is_none() {
732                    anyhow::bail!(
733                        "export '{}': incremental mode requires cursor_column",
734                        export.name
735                    );
736                }
737                match export.incremental_cursor_mode {
738                    IncrementalCursorMode::Coalesce => {
739                        if export.cursor_fallback_column.is_none() {
740                            anyhow::bail!(
741                                "export '{}': incremental_cursor_mode: coalesce requires cursor_fallback_column",
742                                export.name
743                            );
744                        }
745                    }
746                    IncrementalCursorMode::SingleColumn => {
747                        if export.cursor_fallback_column.is_some() {
748                            anyhow::bail!(
749                                "export '{}': cursor_fallback_column is only valid with incremental_cursor_mode: coalesce",
750                                export.name
751                            );
752                        }
753                    }
754                }
755            }
756            ExportMode::Chunked => {
757                // `chunk_column` is mandatory unless the user used the `table:`
758                // shortcut on a Postgres source — in that case it is auto-resolved
759                // from the table's single-integer PK at plan-build time (see
760                // `crate::plan::build::resolve_chunk_column`).
761                if export.chunk_column.is_none() && export.table.is_none() {
762                    anyhow::bail!(
763                        "export '{}': chunked mode needs a chunking strategy. Pick one:\n  \
764                         chunk_column: <int col>    range chunks on an integer column (most common)\n  \
765                         chunk_by_key: <unique col>  keyset pagination when there's no integer PK\n  \
766                         chunk_count: <N>            split the range into N equal chunks\n  \
767                         chunk_by_days: <D>          time-bucketed chunks (needs a date/timestamp column)\n  \
768                         Or use the `table:` shortcut on a single table — rivet auto-resolves the column from the primary key.",
769                        export.name
770                    );
771                }
772                // chunk_size == 0 would divide the range into zero-width
773                // slices and (before the saturating fix in generate_chunks)
774                // either infinite-loop or produce no progress. QA backlog
775                // Task 5.1.
776                if export.chunk_size == 0 {
777                    anyhow::bail!(
778                        "export '{}': chunked mode requires chunk_size >= 1 (got 0)",
779                        export.name
780                    );
781                }
782                // parallel == 0 means "spawn zero workers". Claiming tasks
783                // with no workers stalls the pipeline. QA backlog Task 5.1.
784                if export.parallel == 0 {
785                    anyhow::bail!(
786                        "export '{}': chunked mode requires parallel >= 1 (got 0)",
787                        export.name
788                    );
789                }
790                if let Some(0) = export.chunk_count {
791                    anyhow::bail!("export '{}': chunk_count must be >= 1", export.name);
792                }
793                if export.chunk_count.is_some() && export.chunk_dense {
794                    anyhow::bail!(
795                        "export '{}': chunk_count and chunk_dense are mutually exclusive. \
796                         Use chunk_count for equal-sized chunks over a sparse key; \
797                         use chunk_dense only when the key has no gaps.",
798                        export.name
799                    );
800                }
801                if export.chunk_count.is_some() && export.chunk_by_days.is_some() {
802                    anyhow::bail!(
803                        "export '{}': chunk_count and chunk_by_days are mutually exclusive. \
804                         Use chunk_count: N to split an integer range into N chunks; \
805                         use chunk_by_days: D to bucket a date/timestamp column by D-day windows.",
806                        export.name
807                    );
808                }
809            }
810            ExportMode::TimeWindow => {
811                if export.time_column.is_none() {
812                    anyhow::bail!(
813                        "export '{}': time_window mode requires time_column",
814                        export.name
815                    );
816                }
817                if export.days_window.is_none() {
818                    anyhow::bail!(
819                        "export '{}': time_window mode requires days_window",
820                        export.name
821                    );
822                }
823            }
824            ExportMode::Full => {}
825        }
826
827        if export.chunk_dense && export.mode != ExportMode::Chunked {
828            anyhow::bail!(
829                "export '{}': chunk_dense is only valid with mode: chunked",
830                export.name
831            );
832        }
833
834        if let Some(days) = export.chunk_by_days {
835            if export.mode != ExportMode::Chunked {
836                anyhow::bail!(
837                    "export '{}': chunk_by_days requires mode: chunked",
838                    export.name
839                );
840            }
841            if export.chunk_dense {
842                anyhow::bail!(
843                    "export '{}': chunk_by_days cannot be combined with chunk_dense",
844                    export.name
845                );
846            }
847            if days == 0 {
848                anyhow::bail!("export '{}': chunk_by_days must be at least 1", export.name);
849            }
850        }
851        Ok(())
852    }
853}
854
855/// True when a single YAML line carries a mapping value (text after `key:`)
856/// that contains a `{` outside of any quotes — the unquoted-template-brace
857/// shape (`prefix: {partition}`, `path: {date}/out`).
858///
859/// Quote-aware so a properly quoted value (`prefix: "exports/{partition}/"`)
860/// does *not* match, and `$`-prefixed braces (`${VAR}` env placeholders) are
861/// ignored — they are resolved before the parse and are not the footgun.
862fn line_has_unquoted_brace_value(line: &str) -> bool {
863    // Whole-line comments never carry a value — skip before splitting.
864    if line.trim_start().starts_with('#') {
865        return false;
866    }
867    // Split key from value at the first `": "` / `":\t"` / trailing `:`.
868    // A YAML plain-key separator is a colon followed by whitespace or EOL.
869    let bytes = line.as_bytes();
870    let mut sep = None;
871    let mut i = 0;
872    while i < bytes.len() {
873        if bytes[i] == b':' && (i + 1 == bytes.len() || bytes[i + 1].is_ascii_whitespace()) {
874            sep = Some(i + 1);
875            break;
876        }
877        i += 1;
878    }
879    let Some(value_start) = sep else {
880        return false;
881    };
882    let value = line[value_start..].trim_start();
883    // A trailing `#` after the value starts an inline comment; an empty or
884    // comment-only value carries no brace to flag.
885    if value.is_empty() || value.starts_with('#') {
886        return false;
887    }
888
889    let mut in_single = false;
890    let mut in_double = false;
891    let vbytes = value.as_bytes();
892    for (j, &c) in vbytes.iter().enumerate() {
893        match c {
894            b'\'' if !in_double => in_single = !in_single,
895            b'"' if !in_single => in_double = !in_double,
896            b'{' if !in_single && !in_double => {
897                // Ignore `${...}` env placeholders (resolved pre-parse).
898                if j > 0 && vbytes[j - 1] == b'$' {
899                    continue;
900                }
901                return true;
902            }
903            _ => {}
904        }
905    }
906    false
907}
908
909/// Extract the lower-cased host from a `scheme://host[:port][/path]` endpoint,
910/// or `None` when it does not look like a URL.
911///
912/// SecOps helper for the cloud-`endpoint` exfiltration guard (V2/V12): the host
913/// decides whether a custom endpoint is a local emulator (loopback) or a remote
914/// redirect target. We reject every non-loopback custom endpoint regardless of
915/// scheme (covering both the exfil and the cleartext-`http` gaps), so only the
916/// host is needed. We hand-parse rather than pull in a URL crate — the inputs
917/// are operator-typed endpoints, not arbitrary URIs. A bracketed IPv6 literal
918/// authority (`http://[::1]:9000`) keeps its address so it compares against the
919/// loopback list.
920fn endpoint_host(endpoint: &str) -> Option<String> {
921    let (scheme, rest) = endpoint.split_once("://")?;
922    if scheme.is_empty() {
923        return None;
924    }
925    // Authority ends at the first `/` (path), `?` (query), or `#` (fragment);
926    // any `user[:pass]@` userinfo head is dropped (host is after the last `@`).
927    let authority = rest
928        .split(['/', '?', '#'])
929        .next()
930        .unwrap_or("")
931        .rsplit('@')
932        .next()
933        .unwrap_or("");
934    let host = if let Some(stripped) = authority.strip_prefix('[') {
935        // Bracketed IPv6 literal: take up to the closing `]`.
936        stripped.split(']').next().unwrap_or("")
937    } else {
938        // host[:port] — strip the port suffix.
939        authority.split(':').next().unwrap_or("")
940    };
941    if host.is_empty() {
942        return None;
943    }
944    Some(host.to_ascii_lowercase())
945}
946
947/// True when `host` names the local machine — the legitimate cloud-emulator
948/// target (Minio / Azurite / fake-gcs on `127.0.0.1`). Anything else is a
949/// remote host and a potential exfiltration redirect.
950fn is_loopback_host(host: &str) -> bool {
951    // `localhost` is the only non-IP host that counts as loopback. Everything
952    // else must PARSE as an IP literal in the loopback range — a lexical
953    // `starts_with("127.")` would accept attacker-controlled DNS like
954    // `127.attacker.com` or `127.0.0.1.evil.com` (both resolve off-box), turning
955    // the credential-exfil gate into a bypass (V2/V12). Parse strictly: only a
956    // real `127.0.0.0/8` / `::1` address is loopback; a hostname is not.
957    if host == "localhost" {
958        return true;
959    }
960    // Tolerate a bracketed IPv6 literal (`[::1]`) in case a caller forwards one.
961    let h = host
962        .strip_prefix('[')
963        .and_then(|s| s.strip_suffix(']'))
964        .unwrap_or(host);
965    h.parse::<std::net::IpAddr>()
966        .is_ok_and(|ip| ip.is_loopback())
967}
968
969/// True when `name` is filename-safe: rejects path-traversal (`..`), absolute
970/// or slash-bearing names (`/`, `\`), a leading `.` (hidden / current-dir), and
971/// embedded NULs. `ExportConfig.name` is keyed into output paths and on-disk
972/// state, so a `../../etc/x` or absolute name escapes the output tree (V5).
973fn is_filename_safe_name(name: &str) -> bool {
974    !name.is_empty()
975        && !name.starts_with('.')
976        && !name.contains('/')
977        && !name.contains('\\')
978        && !name.contains("..")
979        && !name.contains('\0')
980}
981
982#[cfg(test)]
983mod tests;
984
985#[cfg(test)]
986mod audit_csv_compression {
987    //! Finding #10: `format: csv` + a compression codec is a silent no-op
988    //! (the file stays uncompressed but the manifest records the codec). The
989    //! combo must be rejected at config-validate time. These tests encode the
990    //! new rule, so reverting the fix turns them red.
991    use super::*;
992
993    fn yaml(format: &str, compression_line: &str) -> String {
994        format!(
995            "source:\n  type: postgres\n  url: \"postgresql://localhost/test\"\n\
996             exports:\n  - name: t\n    query: \"SELECT 1\"\n    format: {format}\n\
997             {compression_line}    destination:\n      type: local\n      path: ./out\n"
998        )
999    }
1000
1001    #[test]
1002    fn audit_csv_compression_is_rejected() {
1003        // csv + gzip → rejected, with an actionable message.
1004        let err = Config::from_yaml(&yaml("csv", "    compression: gzip\n")).unwrap_err();
1005        let msg = format!("{err:#}");
1006        assert!(
1007            msg.contains("CSV output does not support compression") && msg.contains("gzip"),
1008            "csv+gzip must be rejected with an actionable message; got: {msg}"
1009        );
1010        assert!(
1011            msg.contains("parquet") && msg.contains("none"),
1012            "message must point to the real options (parquet / none); got: {msg}"
1013        );
1014
1015        // Guard the boundaries: parquet+gzip and csv+none still validate.
1016        Config::from_yaml(&yaml("parquet", "    compression: gzip\n"))
1017            .expect("parquet+gzip must validate");
1018        Config::from_yaml(&yaml("csv", "    compression: none\n")).expect("csv+none must validate");
1019    }
1020
1021    #[test]
1022    fn audit_csv_every_real_codec_is_rejected() {
1023        // Each non-None codec is a silent no-op for CSV — none may slip through.
1024        for codec in ["zstd", "snappy", "gzip", "lz4"] {
1025            let err = Config::from_yaml(&yaml("csv", &format!("    compression: {codec}\n")))
1026                .unwrap_err();
1027            let msg = format!("{err:#}");
1028            assert!(
1029                msg.contains("CSV output does not support compression") && msg.contains(codec),
1030                "csv+{codec} must be rejected; got: {msg}"
1031            );
1032        }
1033    }
1034
1035    #[test]
1036    fn audit_csv_compression_profile_is_rejected() {
1037        // A `compression_profile:` other than `none` resolves to a real codec,
1038        // so it is the same silent no-op for CSV.
1039        for profile in ["fast", "balanced", "compact"] {
1040            let err = Config::from_yaml(&yaml(
1041                "csv",
1042                &format!("    compression_profile: {profile}\n"),
1043            ))
1044            .unwrap_err();
1045            let msg = format!("{err:#}");
1046            assert!(
1047                msg.contains("CSV output does not support compression_profile")
1048                    && msg.contains(profile),
1049                "csv+profile {profile} must be rejected; got: {msg}"
1050            );
1051        }
1052        // profile: none is a no-op request and is fine.
1053        Config::from_yaml(&yaml("csv", "    compression_profile: none\n"))
1054            .expect("csv + compression_profile: none must validate");
1055    }
1056
1057    #[test]
1058    fn audit_csv_default_compression_still_validates() {
1059        // Regression guard: a bare `format: csv` (no explicit codec) must keep
1060        // validating. `CompressionType::default()` is `Zstd`, but the user did
1061        // not *ask* for it — only an explicit codec is a no-op request. This
1062        // pins that the fix scans for explicit intent, not the struct default
1063        // (which would break ~60 existing csv configs).
1064        Config::from_yaml(&yaml("csv", "")).expect("bare format: csv must validate");
1065    }
1066
1067    #[test]
1068    fn audit_compression_supported_predicate() {
1069        // `compression_supported` is re-exported via `pub use format::*`.
1070        // Parquet supports every codec; CSV supports only None.
1071        for ct in [
1072            CompressionType::Zstd,
1073            CompressionType::Snappy,
1074            CompressionType::Gzip,
1075            CompressionType::Lz4,
1076            CompressionType::None,
1077        ] {
1078            assert!(compression_supported(FormatType::Parquet, ct));
1079        }
1080        assert!(compression_supported(
1081            FormatType::Csv,
1082            CompressionType::None
1083        ));
1084        for ct in [
1085            CompressionType::Zstd,
1086            CompressionType::Snappy,
1087            CompressionType::Gzip,
1088            CompressionType::Lz4,
1089        ] {
1090            assert!(
1091                !compression_supported(FormatType::Csv, ct),
1092                "CSV must not claim to support {}",
1093                ct.label()
1094            );
1095        }
1096    }
1097}
1098
1099#[cfg(test)]
1100mod audit_unquoted_template_brace {
1101    //! yaml-hint: an unquoted `{partition}` (or `{date}`) in a path/prefix
1102    //! value trips serde_yaml_ng's flow-mapping parser with a cryptic message
1103    //! that gives no clue the brace needs quoting. Since `{partition}` is the
1104    //! required token for `partition_by`, this is a common copy-paste footgun.
1105    //! `Config::from_yaml` augments the parser error with a quoting hint; these
1106    //! tests pin that behavior (and guard that valid configs are untouched).
1107    use super::*;
1108
1109    /// A full, otherwise-valid config whose `prefix:` value is whatever the
1110    /// caller passes verbatim (quoted or not). Only the `prefix:` line varies,
1111    /// so any parse error is attributable to the brace under test.
1112    fn yaml_with_prefix(prefix_value: &str) -> String {
1113        format!(
1114            "source:\n\
1115             \x20 type: postgres\n\
1116             \x20 url: \"postgresql://localhost/test\"\n\
1117             exports:\n\
1118             \x20 - name: t\n\
1119             \x20   query: \"SELECT 1\"\n\
1120             \x20   format: parquet\n\
1121             \x20   partition_by: created_date\n\
1122             \x20   destination:\n\
1123             \x20     type: local\n\
1124             \x20     path: ./out\n\
1125             \x20     prefix: {prefix_value}\n"
1126        )
1127    }
1128
1129    const HINT_FRAGMENT: &str =
1130        "a YAML value containing { } (such as {partition} or {date}) must be quoted";
1131
1132    #[test]
1133    fn bare_partition_token_gets_quoting_hint() {
1134        // `prefix: {partition}` parses as a YAML map, so serde rejects it with
1135        // `invalid type: map, expected a string` — no clue it's a quoting bug.
1136        let err = Config::from_yaml(&yaml_with_prefix("{partition}")).unwrap_err();
1137        let msg = format!("{err:#}");
1138        assert!(
1139            msg.contains(HINT_FRAGMENT),
1140            "bare {{partition}} must carry the quoting hint; got: {msg}"
1141        );
1142        // The original parser detail (type + location) is preserved.
1143        assert!(
1144            msg.contains("invalid type: map") || msg.contains("line"),
1145            "the original parser error must be kept; got: {msg}"
1146        );
1147    }
1148
1149    #[test]
1150    fn trailing_text_after_brace_gets_quoting_hint() {
1151        // `prefix: {date}/{partition}/` runs the flow map into block context:
1152        // serde emits `did not find expected key ... while parsing a block
1153        // mapping`. Same footgun, different libyaml symptom.
1154        let err = Config::from_yaml(&yaml_with_prefix("{date}/{partition}/")).unwrap_err();
1155        let msg = format!("{err:#}");
1156        assert!(
1157            msg.contains(HINT_FRAGMENT),
1158            "{{date}}/{{partition}}/ must carry the quoting hint; got: {msg}"
1159        );
1160    }
1161
1162    #[test]
1163    fn unclosed_brace_gets_quoting_hint() {
1164        // `prefix: {partition` (unclosed) is the canonical flow-mapping scanner
1165        // error: `did not find expected ',' or '}' ... while parsing a flow
1166        // mapping`. The hint must still fire.
1167        let err = Config::from_yaml(&yaml_with_prefix("{partition")).unwrap_err();
1168        let msg = format!("{err:#}");
1169        assert!(
1170            msg.contains(HINT_FRAGMENT),
1171            "unclosed brace must carry the quoting hint; got: {msg}"
1172        );
1173    }
1174
1175    #[test]
1176    fn quoted_brace_value_loads_ok() {
1177        // The fix itself, applied: a properly quoted brace value parses and
1178        // validates. This is the guard that the hint never reaches a valid
1179        // config and the success path is unchanged.
1180        let cfg = Config::from_yaml(&yaml_with_prefix("\"exports/{partition}/\""))
1181            .expect("quoted {partition} prefix must load");
1182        assert_eq!(
1183            cfg.exports[0].destination.prefix.as_deref(),
1184            Some("exports/{partition}/")
1185        );
1186    }
1187
1188    #[test]
1189    fn config_without_braces_is_untouched() {
1190        // No brace anywhere: a plain valid config still loads, and an unrelated
1191        // YAML error elsewhere must not pick up a spurious quoting hint.
1192        Config::from_yaml(&yaml_with_prefix("exports/data/"))
1193            .expect("a brace-free prefix must load");
1194    }
1195
1196    // ── line_has_unquoted_brace_value() unit coverage ──────────────────────
1197
1198    #[test]
1199    fn unquoted_brace_value_is_detected() {
1200        assert!(line_has_unquoted_brace_value("    prefix: {partition}"));
1201        assert!(line_has_unquoted_brace_value("      path: {date}/out"));
1202        assert!(line_has_unquoted_brace_value("prefix: {partition")); // unclosed
1203    }
1204
1205    #[test]
1206    fn quoted_brace_value_is_not_flagged() {
1207        // Quotes around the value hide the brace from the scanner — not a bug.
1208        assert!(!line_has_unquoted_brace_value(
1209            "    prefix: \"exports/{partition}/\""
1210        ));
1211        assert!(!line_has_unquoted_brace_value("    prefix: 'data/{date}/'"));
1212    }
1213
1214    #[test]
1215    fn env_placeholder_and_plain_values_are_not_flagged() {
1216        // `${VAR}` placeholders are resolved before the parse and are not the
1217        // footgun; plain brace-free values are obviously fine.
1218        assert!(!line_has_unquoted_brace_value("    url: ${DATABASE_URL}"));
1219        assert!(!line_has_unquoted_brace_value("    path: ./out"));
1220        assert!(!line_has_unquoted_brace_value("  # prefix: {partition}")); // comment
1221        assert!(!line_has_unquoted_brace_value("    prefix:")); // no value
1222    }
1223}
1224
1225#[cfg(test)]
1226mod sec_config_validation_regression {
1227    //! Regression edge-cases that pin the *compat boundaries* of the
1228    //! config-validation security fixes — the cases that distinguish a real
1229    //! attack from a legitimate loopback / dev-container / Docker pattern.
1230    //! These complement the RED tests in `sec_config_validation`: the RED
1231    //! tests assert the attack is rejected; these assert the fix stays narrow
1232    //! enough not to break local-dev usage (see CRITICAL COMPAT).
1233    use super::*;
1234
1235    /// A full, otherwise-valid config whose single export's `destination:`
1236    /// block is whatever the caller passes verbatim.
1237    fn yaml_with_destination(dest_block: &str) -> String {
1238        format!(
1239            "source:\n  type: postgres\n  url: \"postgresql://localhost/test\"\n\
1240             exports:\n  - name: t\n    query: \"SELECT 1\"\n    format: parquet\n\
1241             {dest_block}"
1242        )
1243    }
1244
1245    // ── endpoint_host / is_loopback_host helpers ─────────────────────────────
1246
1247    #[test]
1248    fn endpoint_host_parses_forms() {
1249        assert_eq!(
1250            endpoint_host("https://attacker.example.com").as_deref(),
1251            Some("attacker.example.com")
1252        );
1253        // Port and path are stripped from the host.
1254        assert_eq!(
1255            endpoint_host("http://127.0.0.1:10000/devstoreaccount1").as_deref(),
1256            Some("127.0.0.1")
1257        );
1258        // userinfo head is dropped (host is after the last `@`).
1259        assert_eq!(
1260            endpoint_host("http://user:pass@127.0.0.1:9000").as_deref(),
1261            Some("127.0.0.1")
1262        );
1263        // Bracketed IPv6 literal keeps its address.
1264        assert_eq!(endpoint_host("http://[::1]:9000").as_deref(), Some("::1"));
1265        // Not a URL → None (treated as a non-loopback custom endpoint upstream).
1266        assert_eq!(endpoint_host("not-a-url"), None);
1267        assert_eq!(endpoint_host("://nohost"), None);
1268    }
1269
1270    #[test]
1271    fn loopback_host_classification() {
1272        for h in ["127.0.0.1", "127.0.0.53", "localhost", "::1"] {
1273            assert!(is_loopback_host(h), "{h} must be loopback");
1274        }
1275        for h in ["attacker.example.com", "evil.com", "10.0.0.1", "::2"] {
1276            assert!(!is_loopback_host(h), "{h} must be remote");
1277        }
1278    }
1279
1280    // ── V2/V12 endpoint: loopback accepted regardless of allow_anonymous ─────
1281
1282    #[test]
1283    fn loopback_endpoint_without_allow_anonymous_still_accepted() {
1284        // A loopback emulator endpoint with credentials (no allow_anonymous) is
1285        // the Minio-with-keys local-dev pattern and must stay accepted — the
1286        // exfil guard targets *remote* hosts, not localhost.
1287        let cfg = yaml_with_destination(
1288            "    destination:\n      type: s3\n      bucket: b\n      region: us-east-1\n\
1289             \x20     endpoint: http://127.0.0.1:9000\n      access_key_env: AK\n      secret_key_env: SK\n",
1290        );
1291        Config::from_yaml(&cfg).expect("loopback endpoint with creds must stay accepted");
1292    }
1293
1294    #[test]
1295    fn remote_https_endpoint_with_allow_anonymous_is_the_only_remote_escape() {
1296        // The documented escape hatch: an explicit anonymous (emulator) opt-in
1297        // permits a non-loopback endpoint (no credentials are sent). Without
1298        // allow_anonymous the same endpoint is rejected (covered by the RED
1299        // test); with it, accepted.
1300        let cfg = yaml_with_destination(
1301            "    destination:\n      type: gcs\n      bucket: b\n\
1302             \x20     endpoint: https://emulator.example.com\n      allow_anonymous: true\n",
1303        );
1304        Config::from_yaml(&cfg).expect("remote endpoint + allow_anonymous opt-in must be accepted");
1305    }
1306
1307    // ── V15 local path: absolute allowed (Docker mount), `..` rejected ───────
1308
1309    #[test]
1310    fn absolute_local_path_is_allowed() {
1311        // `path: /output` is a legitimate Docker volume-mount pattern
1312        // (examples/rivet.yaml) and must keep validating — only `..` climbs are
1313        // the traversal footgun.
1314        let cfg =
1315            yaml_with_destination("    destination:\n      type: local\n      path: /output\n");
1316        Config::from_yaml(&cfg).expect("absolute local path (Docker mount) must validate");
1317    }
1318
1319    #[test]
1320    fn dotdot_in_local_prefix_is_rejected() {
1321        // `prefix` is guarded the same as `path`.
1322        let cfg = yaml_with_destination(
1323            "    destination:\n      type: local\n      path: ./out\n      prefix: a/../b\n",
1324        );
1325        let err = Config::from_yaml(&cfg).unwrap_err();
1326        let msg = format!("{err:#}");
1327        assert!(
1328            msg.contains("prefix") && msg.contains(".."),
1329            "a '..' in the local prefix must be rejected naming prefix/..; got: {msg}"
1330        );
1331    }
1332
1333    // ── V13 TLS: explicit enforced mode + knob rejected; default-mode kept ───
1334
1335    #[test]
1336    fn tls_danger_knob_without_explicit_mode_still_accepted() {
1337        // The dev-container pattern `tls: { accept_invalid_certs: true }` against
1338        // a loopback self-signed cert (e.g. the MSSQL docker container) omits
1339        // `mode:` — there is no *explicit* mode to contradict, so it must keep
1340        // validating. The RED test rejects only the explicit-mode contradiction.
1341        let yaml = "source:\n  type: mssql\n  url: \"sqlserver://sa:pw@127.0.0.1:1433/db\"\n  \
1342                    tls:\n    accept_invalid_certs: true\n\
1343                    exports:\n  - name: t\n    query: \"SELECT 1\"\n    format: parquet\n    \
1344                    destination:\n      type: local\n      path: ./out\n";
1345        Config::from_yaml(yaml)
1346            .expect("dev-container default-mode + accept_invalid_certs must stay accepted");
1347    }
1348
1349    #[test]
1350    fn tls_explicit_verify_ca_plus_invalid_hostnames_rejected() {
1351        // The hostname knob is flagged too, against any explicit enforced mode.
1352        let yaml = "source:\n  type: postgres\n  url: \"postgresql://localhost/test\"\n  \
1353                    tls:\n    mode: verify-ca\n    accept_invalid_hostnames: true\n\
1354                    exports:\n  - name: t\n    query: \"SELECT 1\"\n    format: parquet\n    \
1355                    destination:\n      type: local\n      path: ./out\n";
1356        let err = Config::from_yaml(yaml).unwrap_err();
1357        let msg = format!("{err:#}");
1358        assert!(
1359            msg.contains("accept_invalid_hostnames") && msg.contains("verify-ca"),
1360            "explicit verify-ca + accept_invalid_hostnames must be rejected; got: {msg}"
1361        );
1362    }
1363
1364    #[test]
1365    fn tls_explicit_disable_with_knob_is_not_flagged() {
1366        // `mode: disable` carries no verification promise to contradict, so the
1367        // danger knob is a no-op there and must not be rejected.
1368        let yaml = "source:\n  type: postgres\n  url: \"postgresql://localhost/test\"\n  \
1369                    tls:\n    mode: disable\n    accept_invalid_certs: true\n\
1370                    exports:\n  - name: t\n    query: \"SELECT 1\"\n    format: parquet\n    \
1371                    destination:\n      type: local\n      path: ./out\n";
1372        Config::from_yaml(yaml).expect("mode: disable + knob is a no-op and must validate");
1373    }
1374
1375    // ── V5 name: filename-safe predicate boundaries ──────────────────────────
1376
1377    #[test]
1378    fn filename_safe_name_boundaries() {
1379        for ok in ["t", "orders", "daily_events", "v2-2024", "name.with.dots"] {
1380            assert!(is_filename_safe_name(ok), "{ok:?} must be accepted");
1381        }
1382        for bad in [
1383            "",
1384            "..",
1385            "../x",
1386            "/abs",
1387            "sub/dir",
1388            "back\\slash",
1389            ".hidden",
1390            "with\u{0000}nul",
1391        ] {
1392            assert!(!is_filename_safe_name(bad), "{bad:?} must be rejected");
1393        }
1394    }
1395}
1396
1397#[cfg(test)]
1398mod sec_config_validation {
1399    //! RED security tests for config-load validation gaps (cluster:
1400    //! config-validation). Each asserts the SECURE behavior through the
1401    //! stable `Config::from_yaml` seam: a malicious config that is accepted
1402    //! today must be REJECTED (or, for warn-only knobs, surfaced as an
1403    //! error/loud warning) at config-load. These are expected to FAIL until
1404    //! the corresponding production fix lands.
1405    //!
1406    //! The pattern mirrors the existing `query_file` `..`/absolute-path guard
1407    //! in `validate_export` (see `config/tests/validation.rs`): a syntactic
1408    //! check that runs at config-validate time so `rivet check` / `rivet
1409    //! doctor` catch the problem before any connect/plan/upload step.
1410    use super::*;
1411
1412    /// A full, otherwise-valid config whose single export's `destination:`
1413    /// block is whatever the caller passes verbatim. Only the destination
1414    /// varies, so any rejection is attributable to the destination under test.
1415    fn yaml_with_destination(dest_block: &str) -> String {
1416        format!(
1417            "source:\n  type: postgres\n  url: \"postgresql://localhost/test\"\n\
1418             exports:\n  - name: t\n    query: \"SELECT 1\"\n    format: parquet\n\
1419             {dest_block}"
1420        )
1421    }
1422
1423    // ── V2/V12: cloud-endpoint exfiltration + http cleartext ────────────────
1424    //
1425    // `destination.endpoint` is passed straight to the opendal S3/GCS/Azure
1426    // builder with no validation (see `src/destination/{s3,gcs,azure}.rs`),
1427    // so a committed config can silently redirect every export to an
1428    // attacker-controlled host. Two distinct gaps:
1429    //   V2  — a custom *non-loopback* endpoint (data exfiltration target).
1430    //   V12 — an `http://` (plaintext) endpoint (credentials + data on the
1431    //         wire in cleartext).
1432    // The secure behavior is to reject (or require explicit opt-in) at
1433    // config-load. Loopback/emulator endpoints (Minio/Azurite/fake-gcs on
1434    // 127.0.0.1) MUST stay accepted — that path is exercised by the existing
1435    // `gcs_allow_anonymous_parses` test and the guard test below.
1436
1437    #[test]
1438    fn sec_s3_custom_endpoint_rejected() {
1439        // SEC-RED V2: a non-loopback custom S3 endpoint is an exfiltration
1440        // target — every part upload goes to attacker.example.com. Must be
1441        // rejected (or require explicit opt-in) at config-load. Accepted today.
1442        let cfg = yaml_with_destination(
1443            "    destination:\n      type: s3\n      bucket: my-bucket\n      region: us-east-1\n\
1444             \x20     endpoint: https://attacker.example.com\n",
1445        );
1446        let res = Config::from_yaml(&cfg);
1447        assert!(
1448            res.is_err(),
1449            "a non-loopback custom S3 endpoint (https://attacker.example.com) must be \
1450             rejected at config-load (data-exfiltration target); got Ok"
1451        );
1452        let msg = format!("{:#}", res.unwrap_err());
1453        assert!(
1454            msg.contains("endpoint"),
1455            "rejection must name the offending 'endpoint' field; got: {msg}"
1456        );
1457    }
1458
1459    #[test]
1460    fn sec_http_endpoint_rejected() {
1461        // SEC-RED V12: a plaintext http:// endpoint to a *remote* host sends
1462        // credentials and exported rows over the wire in cleartext. Must be
1463        // rejected (or require explicit opt-in) at config-load. Accepted today.
1464        // Use a non-loopback host so this is distinct from the Minio/Azurite
1465        // loopback emulator case (guarded below).
1466        let cfg = yaml_with_destination(
1467            "    destination:\n      type: s3\n      bucket: my-bucket\n      region: us-east-1\n\
1468             \x20     endpoint: http://evil.com\n",
1469        );
1470        let res = Config::from_yaml(&cfg);
1471        assert!(
1472            res.is_err(),
1473            "a plaintext http:// endpoint to a remote host (http://evil.com) must be \
1474             rejected at config-load (cleartext credentials + data); got Ok"
1475        );
1476        let msg = format!("{:#}", res.unwrap_err());
1477        assert!(
1478            msg.contains("endpoint") || msg.to_lowercase().contains("http"),
1479            "rejection must name the endpoint / cleartext problem; got: {msg}"
1480        );
1481    }
1482
1483    #[test]
1484    fn sec_loopback_endpoint_still_accepted_guard() {
1485        // SEC-RED V2/V12 (guard): a loopback emulator endpoint
1486        // (`http://127.0.0.1:9000` Minio, with allow_anonymous) is the
1487        // legitimate local-dev path and MUST stay accepted after the fix.
1488        // This pins that the endpoint rejection targets *remote* hosts, not
1489        // localhost — otherwise the fix breaks every Minio/Azurite/fake-gcs
1490        // integration test (see `gcs_allow_anonymous_parses`).
1491        let cfg = yaml_with_destination(
1492            "    destination:\n      type: s3\n      bucket: my-bucket\n      region: us-east-1\n\
1493             \x20     endpoint: http://127.0.0.1:9000\n      allow_anonymous: true\n",
1494        );
1495        Config::from_yaml(&cfg)
1496            .expect("a loopback emulator endpoint with allow_anonymous must stay accepted");
1497    }
1498
1499    // ── V5: export `name` path traversal ────────────────────────────────────
1500    //
1501    // `ExportConfig.name` is a free-form `String` keyed into state tracking,
1502    // file logs, and (via the destination layout) output paths — yet it is
1503    // never validated. A name like `../../../etc/x`, an absolute `/abs/x`, a
1504    // bare slash, or an embedded NUL can escape the intended output tree.
1505    // Mirror the `query_file` `..`/absolute guard: reject at config-load.
1506
1507    #[test]
1508    fn sec_export_name_traversal_rejected() {
1509        // SEC-RED V5: a traversal / absolute / slash / NUL export name escapes
1510        // the output tree (and corrupts name-keyed state). Must be rejected at
1511        // config-load. Accepted today.
1512        for bad in ["../../../etc/x", "/abs/x", "sub/dir", "with\u{0000}nul"] {
1513            // `name:` is JSON-encoded so embedded slashes / NULs survive the
1514            // YAML parse verbatim and reach validation.
1515            let name_yaml = serde_json::to_string(bad).expect("encode name");
1516            let cfg = format!(
1517                "source:\n  type: postgres\n  url: \"postgresql://localhost/test\"\n\
1518                 exports:\n  - name: {name_yaml}\n    query: \"SELECT 1\"\n    format: parquet\n\
1519                 \x20   destination:\n      type: local\n      path: ./out\n"
1520            );
1521            let res = Config::from_yaml(&cfg);
1522            assert!(
1523                res.is_err(),
1524                "export name {bad:?} (traversal/absolute/slash/NUL) must be rejected at \
1525                 config-load; got Ok"
1526            );
1527            let msg = format!("{:#}", res.unwrap_err());
1528            assert!(
1529                msg.contains("name"),
1530                "rejection of name {bad:?} must name the offending 'name' field; got: {msg}"
1531            );
1532        }
1533    }
1534
1535    #[test]
1536    fn sec_export_name_normal_still_accepted_guard() {
1537        // SEC-RED V5 (guard): a plain, well-formed export name must keep
1538        // loading after the fix. Pins that the traversal check is narrow.
1539        let cfg = yaml_with_destination("    destination:\n      type: local\n      path: ./out\n");
1540        Config::from_yaml(&cfg).expect("a normal export name ('t') must stay accepted");
1541    }
1542
1543    // ── V15: local destination `path` traversal ─────────────────────────────
1544    //
1545    // `destination.path` for a `type: local` export is written verbatim to the
1546    // filesystem. A relative `../../../../tmp/x` or absolute path lets a
1547    // committed config write outside the intended output directory. Must be
1548    // rejected (or at minimum loudly surfaced) at config-load. Accepted today.
1549
1550    #[test]
1551    fn sec_local_dest_path_traversal_rejected() {
1552        // SEC-RED V15: a traversal local-destination path writes outside the
1553        // intended output tree. Must be rejected at config-load. Accepted today.
1554        let cfg = yaml_with_destination(
1555            "    destination:\n      type: local\n      path: ../../../../tmp/x\n",
1556        );
1557        let res = Config::from_yaml(&cfg);
1558        assert!(
1559            res.is_err(),
1560            "a local destination path containing '..' (../../../../tmp/x) must be rejected \
1561             at config-load (writes outside the output tree); got Ok"
1562        );
1563        let msg = format!("{:#}", res.unwrap_err());
1564        assert!(
1565            msg.contains("path") || msg.contains(".."),
1566            "rejection must name the offending 'path' / traversal; got: {msg}"
1567        );
1568    }
1569
1570    // ── V13: dangerous TLS cert-knob combination ─────────────────────────────
1571    //
1572    // `tls: { mode: verify-full, accept_invalid_certs: true }` silently
1573    // *downgrades* the strongest mode to "accept any cert" — `verify-full`
1574    // promises chain + hostname verification, but the danger knob disables
1575    // chain verification (see `src/source/tls.rs::build_native_tls`). The
1576    // comment at `src/source/tls.rs:55-56` claims "Each one emits a warning at
1577    // config-time (see `Config::validate`)" — but `Config::validate` emits no
1578    // such warning today. The secure behavior is a LOUD error (or surfaced
1579    // warning) at config-load. No `Err`/warning is produced today, so this is
1580    // RED.
1581
1582    #[test]
1583    fn sec_accept_invalid_certs_warns() {
1584        // SEC-RED V13: verify-full + accept_invalid_certs: true is a silent
1585        // security downgrade that contradicts the chosen mode. It must be
1586        // loudly surfaced at config-load. The only stable secure seam is an
1587        // `Err` from `Config::from_yaml` (validate returns Ok today, and there
1588        // is no captured-warning seam exposed from here — see notes). Asserting
1589        // `Err` is the strongest secure assertion and is RED against current
1590        // code.
1591        let cfg = yaml_with_destination("    destination:\n      type: local\n      path: ./out\n");
1592        // Splice the TLS block into the source rather than the destination so
1593        // the rest of the config stays valid.
1594        let cfg = cfg.replace(
1595            "  url: \"postgresql://localhost/test\"\n",
1596            "  url: \"postgresql://localhost/test\"\n  tls:\n    mode: verify-full\n    accept_invalid_certs: true\n",
1597        );
1598        let res = Config::from_yaml(&cfg);
1599        assert!(
1600            res.is_err(),
1601            "tls mode: verify-full with accept_invalid_certs: true is a silent security \
1602             downgrade and must be loudly surfaced (error) at config-load; got Ok"
1603        );
1604        let msg = format!("{:#}", res.unwrap_err());
1605        assert!(
1606            msg.contains("accept_invalid_certs") || msg.to_lowercase().contains("verify"),
1607            "the surfaced error must name the dangerous knob / mode contradiction; got: {msg}"
1608        );
1609    }
1610}