rivet/config/mod.rs
1pub mod cursor;
2mod destination;
3mod export;
4mod format;
5mod lints;
6mod notifications;
7pub mod resolve;
8pub mod schema;
9mod source;
10
11pub use cursor::IncrementalCursorMode;
12pub use destination::*;
13pub use export::*;
14pub use format::*;
15pub use notifications::*;
16#[allow(unused_imports)]
17pub(crate) use resolve::resolve_env_vars;
18pub use resolve::{parse_file_size, resolve_vars};
19pub use schema::generate_config_schema_pretty;
20pub use source::*;
21
22use schemars::JsonSchema;
23use serde::Deserialize;
24
25/// Top-level Rivet configuration root.
26///
27/// Operators write this struct as YAML (typically `rivet.yaml`). The
28/// `JsonSchema` derive is the source of truth for the `schemas/rivet.schema.json`
29/// artifact and the `rivet schema config` command's output (v0.7.3 P0).
30#[derive(Debug, Deserialize, JsonSchema, Clone)]
31#[serde(deny_unknown_fields)]
32pub struct Config {
33 pub source: SourceConfig,
34 pub exports: Vec<ExportConfig>,
35 #[serde(default)]
36 pub notifications: Option<NotificationsConfig>,
37 #[serde(default)]
38 pub parallel_exports: bool,
39 #[serde(default)]
40 pub parallel_export_processes: bool,
41}
42
43impl Config {
44 pub fn load(path: &str) -> crate::error::Result<Self> {
45 Self::load_with_params(path, None)
46 }
47
48 pub fn load_with_params(
49 path: &str,
50 params: Option<&std::collections::HashMap<String, String>>,
51 ) -> crate::error::Result<Self> {
52 // F11 (0.7.5 audit): raw `std::io::Error` lost the path on
53 // not-found. Wrap with the file path + a hint so the operator
54 // can see *which* config the tool could not open.
55 let contents = std::fs::read_to_string(path).map_err(|e| {
56 if e.kind() == std::io::ErrorKind::NotFound {
57 anyhow::anyhow!(
58 "config file '{}' not found.\n Hint: check the path, or run `rivet init` to generate one.",
59 path
60 )
61 } else {
62 anyhow::anyhow!("cannot read config file '{}': {}", path, e)
63 }
64 })?;
65 // Warn about typo'd `--param` keys once per CLI invocation, using the
66 // un-resolved YAML as the haystack so the placeholders are still there.
67 // We pass the raw `contents` (not `resolved`) on purpose: after
68 // resolution the placeholders are gone, and every key would look unused.
69 resolve::warn_unused_params(&contents, params);
70 let resolved = resolve_vars(&contents, params)?;
71 // F12 (0.7.5 audit): YAML parse errors did not name the config
72 // file. When loading from disk we know the path — thread it
73 // into the parse error.
74 // `.context` (not `anyhow!("…{:#}", e)`, which stringifies into a fresh
75 // error) so a typed `CodedError` raised by validation survives the chain —
76 // `error::error_code` downcasts it for the `[CODE]` prefix. `{e:#}` in
77 // `main` still renders the full "config file '…': <message>" chain.
78 Self::from_yaml(&resolved).map_err(|e| e.context(format!("config file '{}'", path)))
79 }
80
81 pub fn from_yaml(yaml: &str) -> crate::error::Result<Self> {
82 // Intercept the unquoted-`{partition}` footgun before the raw-YAML
83 // pre-scans below (they `from_str::<Value>` too and would re-emit the
84 // same cryptic libyaml message). A document that does not even parse as
85 // a `Value` is malformed; if the failure looks like a flow-mapping
86 // scanner error *and* the source carries an unquoted brace value, point
87 // straight at the quoting fix. On a valid config this `Value` parse
88 // succeeds and the block is skipped, so the success path is unchanged.
89 if let Err(e) = serde_yaml_ng::from_str::<serde_yaml_ng::Value>(yaml) {
90 let m = e.to_string();
91 if let Some(hint) = Self::unquoted_template_brace_hint(yaml, &m) {
92 return Err(anyhow::anyhow!("{e}\n {hint}"));
93 }
94 // A TAB in indentation is the most common beginner YAML mistake; it
95 // trips libyaml before serde ever sees a field, so it must be caught
96 // here in the raw scan, not in `enhance_parse_error`.
97 if let Some(hint) = Self::tab_indent_hint(yaml, &m) {
98 return Err(anyhow::anyhow!("{e}\n {hint}"));
99 }
100 }
101 Self::check_misplaced_tuning_fields(yaml)?;
102 Self::check_csv_compression(yaml)?;
103 Self::check_tls_mode_downgrade(yaml)?;
104 let config: Config = serde_yaml_ng::from_str(yaml).map_err(|e| {
105 // A well-formed flow map (`prefix: {partition}`) parses as a YAML
106 // value but serde then rejects it with `invalid type: map, expected
107 // a string`. That is the same unquoted-brace footgun, surfacing one
108 // layer later than the scanner errors caught above — so try the same
109 // hint here before falling back to the generic field-typo enhancer.
110 if let Some(hint) = Self::unquoted_template_brace_hint(yaml, &e.to_string()) {
111 anyhow::anyhow!("{e}\n {hint}")
112 } else {
113 lints::enhance_parse_error(e)
114 }
115 })?;
116 config.validate()?;
117 Ok(config)
118 }
119
120 /// Detect the unquoted-`{partition}` (or `{date}`, …) template footgun and
121 /// return an actionable quoting hint, or `None` when the error is unrelated.
122 ///
123 /// A YAML value that *starts* a flow mapping — `prefix: {partition}` — is
124 /// the common copy-paste mistake: `{partition}` is the required token for
125 /// `partition_by`, but unquoted it parses as a YAML map (or, with trailing
126 /// text, trips the libyaml scanner). serde then emits a cryptic
127 /// `did not find expected ',' or '}'` / `while parsing a flow mapping` /
128 /// `invalid type: map, expected a string` with no hint that a pair of
129 /// quotes is the fix.
130 ///
131 /// Two guards keep this from firing on unrelated parse errors: the error
132 /// message must carry one of the flow-mapping symptoms, AND the raw source
133 /// must actually contain an unquoted `{…}` value. Both must hold, so a
134 /// valid config (every brace value quoted) never sees the hint, and a
135 /// genuine map-typed field error elsewhere is left alone.
136 /// A YAML document indented with a TAB trips libyaml with `found character
137 /// that cannot start any token`. Point straight at the fix (spaces, not
138 /// tabs) — but only when the cited line really begins with a tab, so an
139 /// unrelated scanner error is left with its original message.
140 fn tab_indent_hint(yaml: &str, err_msg: &str) -> Option<String> {
141 if !err_msg.contains("tab character") {
142 return None;
143 }
144 // serde_yaml_ng reports `found a tab character … at line N column C …`;
145 // the FIRST `line N` is where the offending tab is.
146 let line_no: usize = err_msg
147 .split_once("line ")
148 .and_then(|(_, rest)| rest.split([' ', ',']).next())
149 .and_then(|n| n.parse().ok())?;
150 let line = yaml.lines().nth(line_no.checked_sub(1)?)?;
151 let leading = &line[..line.len() - line.trim_start().len()];
152 leading.contains('\t').then(|| {
153 format!(
154 "line {line_no} is indented with a TAB — YAML requires spaces. Replace the tab(s) with spaces."
155 )
156 })
157 }
158
159 fn unquoted_template_brace_hint(yaml: &str, err_msg: &str) -> Option<String> {
160 const FLOW_SYMPTOMS: &[&str] = &[
161 "did not find expected ',' or '}'",
162 "while parsing a flow mapping",
163 // A bare `key: {token}` parses as a map, then serde rejects the
164 // map where it wanted a scalar — same root cause, later layer.
165 "invalid type: map, expected a string",
166 // `key: {token}/more` runs the flow map into block context.
167 "did not find expected key",
168 ];
169 if !FLOW_SYMPTOMS.iter().any(|s| err_msg.contains(s)) {
170 return None;
171 }
172 if !yaml.lines().any(line_has_unquoted_brace_value) {
173 return None;
174 }
175 Some(
176 "a YAML value containing { } (such as {partition} or {date}) must be quoted, \
177 e.g. prefix: \"exports/{partition}/\""
178 .to_string(),
179 )
180 }
181
182 /// Reject `format: csv` paired with an explicitly-requested compression
183 /// codec (Finding #10). The CSV writer has no compression encoder, so the
184 /// codec is silently dropped on write while the run manifest still records
185 /// it — a degraded, dishonest no-op. We reject loudly at config-validate
186 /// time so `rivet check` / `rivet doctor` catch it before any run.
187 ///
188 /// This is a raw-YAML scan (like [`Self::check_misplaced_tuning_fields`])
189 /// rather than a `validate_export` check on purpose: `ExportConfig.
190 /// compression` is `#[serde(default)]` and `CompressionType::default()` is
191 /// `Zstd`, so a parsed export cannot distinguish "user asked for zstd" from
192 /// "user omitted the field". Only a user who *wrote* `compression:`/
193 /// `compression_profile:` is asking for something the CSV writer cannot
194 /// honour; the bare-`format: csv` default writes uncompressed and is fine.
195 fn check_csv_compression(yaml: &str) -> crate::error::Result<()> {
196 let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;
197 let Some(exports) = root.get("exports").and_then(|e| e.as_sequence()) else {
198 return Ok(());
199 };
200 for export in exports {
201 if export.get("format").and_then(|f| f.as_str()) != Some("csv") {
202 continue;
203 }
204 let name = export
205 .get("name")
206 .and_then(|n| n.as_str())
207 .unwrap_or("<unnamed>");
208
209 // Explicit `compression:` codec that the CSV writer cannot apply.
210 // An unrecognised label is left for serde to reject during the real
211 // parse; we only act on a codec we understand and that CSV cannot
212 // honour (everything but `none`).
213 if let Some(codec) = export.get("compression").and_then(|c| c.as_str())
214 && let Some(ct) = CompressionType::from_label(codec)
215 && !format::compression_supported(FormatType::Csv, ct)
216 {
217 anyhow::bail!(
218 "export '{}': CSV output does not support compression: {}. \
219 CSV has no compression encoder, so the codec would be silently dropped \
220 while the manifest records it.\n \
221 Hint: use `format: parquet` for compression, or set `compression: none`.",
222 name,
223 codec,
224 );
225 }
226
227 // A `compression_profile:` other than `none` resolves to a real
228 // codec too (fast→snappy, balanced/compact→zstd) — same no-op.
229 if let Some(profile) = export.get("compression_profile").and_then(|c| c.as_str())
230 && profile != CompressionProfile::None.label()
231 {
232 anyhow::bail!(
233 "export '{}': CSV output does not support compression_profile: {} \
234 (it resolves to a compression codec the CSV writer cannot apply).\n \
235 Hint: use `format: parquet` for compression, or set `compression_profile: none`.",
236 name,
237 profile,
238 );
239 }
240 }
241 Ok(())
242 }
243
244 /// V13: reject a `source.tls` block that pairs an *explicitly chosen*
245 /// enforced `mode:` with a verification-disabling danger knob
246 /// (`accept_invalid_certs` / `accept_invalid_hostnames`). `mode: verify-full`
247 /// promises chain + hostname verification, but the knob silently downgrades
248 /// it to "trust anything" — a MITM exposure that contradicts the stated
249 /// intent (see `src/source/tls.rs::build_native_tls`, whose comment claims
250 /// this is warned about at config-time but is not).
251 ///
252 /// Like [`Self::check_csv_compression`], this is a raw-YAML scan rather than
253 /// a `validate` check on purpose: `TlsMode` is `#[serde(default)]` and the
254 /// default is `VerifyFull`, so a parsed config cannot distinguish "user
255 /// wrote `mode: verify-full`" (a contradiction to flag) from "user omitted
256 /// `mode:`" (the common dev-container case `tls: { accept_invalid_certs:
257 /// true }` against a loopback self-signed cert — which must keep working).
258 /// Only an *explicit* enforced `mode:` next to a danger knob is the footgun.
259 fn check_tls_mode_downgrade(yaml: &str) -> crate::error::Result<()> {
260 let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;
261 let Some(tls) = root.get("source").and_then(|s| s.get("tls")) else {
262 return Ok(());
263 };
264
265 // Only an explicitly written `mode:` is a deliberate, contradicted
266 // choice; an omitted mode is the dev-container default path.
267 let Some(mode) = tls.get("mode").and_then(|m| m.as_str()) else {
268 return Ok(());
269 };
270 // `disable` carries no verification promise to contradict; the danger
271 // knobs are a no-op there. Flag only the enforced modes.
272 if mode == "disable" {
273 return Ok(());
274 }
275
276 let knob = if tls
277 .get("accept_invalid_certs")
278 .and_then(|v| v.as_bool())
279 .unwrap_or(false)
280 {
281 Some("accept_invalid_certs")
282 } else if tls
283 .get("accept_invalid_hostnames")
284 .and_then(|v| v.as_bool())
285 .unwrap_or(false)
286 {
287 Some("accept_invalid_hostnames")
288 } else {
289 None
290 };
291
292 if let Some(knob) = knob {
293 anyhow::bail!(
294 "source.tls: {} disables certificate verification, silently downgrading the \
295 chosen `mode: {}` to trust-anything (MITM exposure — credentials and rows \
296 readable/forgeable on the wire).\n \
297 Hint: drop the danger knob and trust a private CA with `tls.ca_file: <pem>`; \
298 only use a danger knob for a loopback self-signed dev container, and then omit \
299 the explicit `mode:` so the contradiction is gone.",
300 knob,
301 mode,
302 );
303 }
304 Ok(())
305 }
306
307 /// Detect tuning-related fields placed directly under `source:` or an
308 /// `exports[]` entry instead of inside the `tuning:` sub-key. Without this
309 /// check serde silently ignores unknown keys and the user gets unexpected
310 /// defaults (e.g. batch_size=10 000 instead of the intended 1 000).
311 fn check_misplaced_tuning_fields(yaml: &str) -> crate::error::Result<()> {
312 const TUNING_FIELDS: &[&str] = &[
313 "batch_size",
314 "batch_size_memory_mb",
315 "throttle_ms",
316 "statement_timeout_s",
317 "max_retries",
318 "retry_backoff_ms",
319 "lock_timeout_s",
320 "memory_threshold_mb",
321 "profile",
322 ];
323
324 let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;
325
326 if let Some(source) = root.get("source") {
327 let misplaced: Vec<&str> = TUNING_FIELDS
328 .iter()
329 .copied()
330 .filter(|&f| source.get(f).is_some())
331 .collect();
332 if !misplaced.is_empty() {
333 anyhow::bail!(
334 "source: field(s) [{}] belong under 'source.tuning:', not directly under 'source:'. \
335 Example:\n source:\n tuning:\n {}: <value>",
336 misplaced.join(", "),
337 misplaced[0],
338 );
339 }
340 }
341
342 if let Some(exports) = root.get("exports").and_then(|e| e.as_sequence()) {
343 for (i, export) in exports.iter().enumerate() {
344 let name = export
345 .get("name")
346 .and_then(|n| n.as_str())
347 .unwrap_or("<unnamed>");
348 let misplaced: Vec<&str> = TUNING_FIELDS
349 .iter()
350 .copied()
351 .filter(|&f| export.get(f).is_some())
352 .collect();
353 if !misplaced.is_empty() {
354 anyhow::bail!(
355 "export '{}' (index {}): field(s) [{}] belong under 'exports[].tuning:', \
356 not directly in the export. Example:\n exports:\n - name: {}\n tuning:\n {}: <value>",
357 name,
358 i,
359 misplaced.join(", "),
360 name,
361 misplaced[0],
362 );
363 }
364 }
365 }
366
367 Ok(())
368 }
369
370 /// Reject a config before any plan/connect step. The body is split into
371 /// three cohesive validators so each can be read — and unit-tested — on its
372 /// own: the export-list shape, the source connection block, and the
373 /// per-export rules. The end-to-end surface (`Config::from_yaml`) is
374 /// covered by `config/tests/{validation,secops}.rs`; the split additionally
375 /// lets a rule be exercised directly via `validate_export`.
376 fn validate(&self) -> crate::error::Result<()> {
377 self.validate_exports_list()?;
378 self.validate_source_connection()?;
379 for export in &self.exports {
380 self.validate_export(export)?;
381 }
382 Ok(())
383 }
384
385 /// Whole-config shape: at least one export, names unique.
386 fn validate_exports_list(&self) -> crate::error::Result<()> {
387 // An empty `exports:` list is almost always a typo (wrong config file,
388 // dropped anchor, merged doc with the anchor section missing). Running
389 // with zero exports is a silent no-op that looks like success in CI;
390 // reject fast instead. See QA backlog Task 5.1.
391 if self.exports.is_empty() {
392 crate::config_bail!(
393 crate::error::codes::CONFIG_NO_EXPORTS,
394 "exports: at least one export must be defined (got empty list)"
395 );
396 }
397
398 // Duplicate export names break state tracking: `export_state`,
399 // `file_log`, and `chunk_run` are all keyed by `export_name`, so
400 // two configs with the same name silently share cursor/file-log rows.
401 // QA backlog Task 5.1.
402 let mut seen: std::collections::HashSet<&str> =
403 std::collections::HashSet::with_capacity(self.exports.len());
404 for e in &self.exports {
405 if !seen.insert(e.name.as_str()) {
406 crate::config_bail!(
407 crate::error::codes::CONFIG_DUPLICATE_EXPORT,
408 "exports: duplicate export name '{}' (each export must have a unique name; state is keyed by name)",
409 e.name
410 );
411 }
412 }
413 Ok(())
414 }
415
416 /// Source connection block: exactly one connection method, well-formed,
417 /// and the source-level tuning that is shared by every export.
418 fn validate_source_connection(&self) -> crate::error::Result<()> {
419 if let Some(t) = &self.source.tuning
420 && t.batch_size.is_some()
421 && t.batch_size_memory_mb.is_some()
422 {
423 anyhow::bail!(
424 "tuning: batch_size and batch_size_memory_mb are mutually exclusive. \
425 Prefer batch_size_memory_mb (rivet sizes the batch to a memory budget, \
426 adapting to row width); set batch_size only to pin an exact row count."
427 );
428 }
429
430 if !self.source.has_url_fields() && !self.source.has_structured_fields() {
431 // First-run footgun: a config that forgot the source block
432 // entirely. Show the recommended path (`url_env`) up-front;
433 // operators who actually want structured fields know to look
434 // for them.
435 anyhow::bail!(
436 "source: no connection method configured. Add one of:\n url_env: DATABASE_URL (URL from env var — recommended)\n url: 'postgresql://user:pass@host:5432/db' (inline — not recommended for committed configs)\n url_file: /etc/rivet/source.url (URL from file — rotation-friendly)\n host/user/database/... (structured fields under `source:`)"
437 );
438 }
439
440 if self.source.has_url_fields() {
441 let url_count = [
442 &self.source.url,
443 &self.source.url_env,
444 &self.source.url_file,
445 ]
446 .iter()
447 .filter(|u| u.is_some())
448 .count();
449 if url_count > 1 {
450 anyhow::bail!(
451 "source: specify exactly one of 'url', 'url_env', or 'url_file' (got {} set).\n Hint: pick one — `url_env` is recommended so credentials never enter the YAML.",
452 url_count
453 );
454 }
455 }
456
457 if self.source.has_url_fields() && self.source.has_structured_fields() {
458 anyhow::bail!(
459 "source: pick either URL-based config (url/url_env/url_file) OR structured fields (host/user/database/port/password_env), not both.\n Hint: remove whichever block you don't want; mixing the two is ambiguous."
460 );
461 }
462
463 if self.source.has_structured_fields() {
464 if self.source.host.is_none() {
465 anyhow::bail!(
466 "source: structured config is missing 'host'.\n Hint: add `host: localhost` (or your DB host) under `source:` in rivet.yaml.\n Or switch to URL-based config: `url_env: DATABASE_URL`."
467 );
468 }
469 if self.source.user.is_none() {
470 anyhow::bail!(
471 "source: structured config is missing 'user'.\n Hint: add `user: <username>` under `source:` in rivet.yaml."
472 );
473 }
474 if self.source.database.is_none() {
475 anyhow::bail!(
476 "source: structured config is missing 'database'.\n Hint: add `database: <dbname>` under `source:` in rivet.yaml."
477 );
478 }
479 if self.source.password.is_some() && self.source.password_env.is_some() {
480 anyhow::bail!(
481 "source: specify 'password' OR 'password_env', not both.\n Hint: prefer `password_env: DB_PASSWORD` so credentials never enter the YAML."
482 );
483 }
484 }
485 Ok(())
486 }
487
488 /// Per-export rules: effective tuning, query source, `query_file` SecOps,
489 /// destination auth, compression, and the mode/chunk matrix. Takes `&self`
490 /// because effective tuning merges the source-level block.
491 fn validate_export(&self, export: &ExportConfig) -> crate::error::Result<()> {
492 // V5: `name` is keyed into output paths, file logs, and on-disk state,
493 // yet is otherwise free-form. A traversal (`../../etc/x`), absolute or
494 // slash-bearing (`/abs/x`, `sub/dir`), leading-dot, or NUL-bearing name
495 // escapes the intended output tree and corrupts name-keyed state.
496 // Mirror the `query_file` `..`/absolute guard: reject at config-load,
497 // accepting only a filename-safe charset.
498 if !is_filename_safe_name(&export.name) {
499 anyhow::bail!(
500 "export name '{}' is not filename-safe: it must not be absolute, contain \
501 '/', '\\', '..', a NUL, or start with '.' (the name is used in output paths \
502 and state keys). Use a plain identifier like `orders` or `daily_events`.",
503 export.name.escape_default(),
504 );
505 }
506
507 let merged =
508 crate::tuning::merge_tuning_config(self.source.tuning.as_ref(), export.tuning.as_ref());
509 if let Some(t) = merged
510 && t.batch_size.is_some()
511 && t.batch_size_memory_mb.is_some()
512 {
513 anyhow::bail!(
514 "export '{}': effective tuning has both batch_size and batch_size_memory_mb (mutually exclusive)",
515 export.name
516 );
517 }
518 if let Some(et) = &export.tuning
519 && et.batch_size.is_some()
520 && et.batch_size_memory_mb.is_some()
521 {
522 anyhow::bail!(
523 "export '{}': tuning.batch_size and tuning.batch_size_memory_mb are mutually exclusive",
524 export.name
525 );
526 }
527
528 let set_count = [
529 export.query.is_some(),
530 export.query_file.is_some(),
531 export.table.is_some(),
532 ]
533 .iter()
534 .filter(|b| **b)
535 .count();
536 if set_count == 0 {
537 anyhow::bail!(
538 "export '{}': specify exactly one of 'query', 'query_file', or 'table'. \
539 Use table: <name> for a whole table (enables PK auto-chunking); \
540 query: \"SELECT …\" for an inline one-liner; \
541 query_file: <path> for SQL you keep in version control.",
542 export.name
543 );
544 }
545 if set_count > 1 {
546 anyhow::bail!(
547 "export '{}': specify exactly one of 'query', 'query_file', or 'table' (got {} set)",
548 export.name,
549 set_count
550 );
551 }
552 // SecOps: syntactic `query_file` checks must run at config-validate
553 // time so `rivet check` / `rivet doctor` catch them before any
554 // plan step. The same checks repeat (with a canonicalize-based
555 // symlink probe) in `ExportConfig::resolve_query` because the
556 // file may have been swapped between validation and read.
557 if let Some(file) = &export.query_file {
558 let p = std::path::Path::new(file);
559 if p.is_absolute() {
560 anyhow::bail!(
561 "export '{}': query_file must be a relative path: '{}'",
562 export.name,
563 file
564 );
565 }
566 if p.components().any(|c| c == std::path::Component::ParentDir) {
567 anyhow::bail!(
568 "export '{}': query_file path must not contain '..': '{}'",
569 export.name,
570 file
571 );
572 }
573 }
574 // V2/V12: a custom cloud `endpoint` is handed straight to the opendal
575 // S3/GCS/Azure builder with no validation, so a committed config can
576 // silently redirect every upload to an attacker host (exfiltration) or
577 // send credentials + rows over cleartext `http://`. The legitimate use
578 // is a local emulator (Minio / Azurite / fake-gcs on `127.0.0.1`), so
579 // accept a loopback host (any scheme), and otherwise accept a remote
580 // endpoint only when the operator has explicitly opted into anonymous
581 // (emulator) mode. Reject every other custom endpoint at config-load.
582 if matches!(
583 export.destination.destination_type,
584 DestinationType::S3 | DestinationType::Gcs | DestinationType::Azure
585 ) && let Some(endpoint) = &export.destination.endpoint
586 {
587 // Loopback emulator (Minio/Azurite/fake-gcs) is the legitimate
588 // local-dev path — accept any scheme. A non-loopback (or
589 // unparseable) custom endpoint is only accepted when the operator
590 // has explicitly opted into anonymous (emulator) mode, where no
591 // credentials are sent. Everything else is rejected.
592 let loopback = endpoint_host(endpoint).is_some_and(|host| is_loopback_host(&host));
593 if !loopback && !export.destination.allow_anonymous {
594 anyhow::bail!(
595 "export '{}': destination.endpoint '{}' points at a non-loopback host. \
596 A custom endpoint redirects every upload there — committing one is a \
597 data-exfiltration / cleartext-credential risk.\n \
598 Hint: drop `endpoint:` to use the provider default, point it at a \
599 loopback emulator (e.g. http://127.0.0.1:9000 with allow_anonymous: true \
600 for Minio/Azurite), or set `allow_anonymous: true` for an anonymous \
601 emulator.",
602 export.name,
603 endpoint,
604 );
605 }
606 }
607
608 // V15: a `type: local` destination `path` (or `prefix`) is written
609 // verbatim to the filesystem. A `..` component lets a committed config
610 // climb out of the intended output tree (`../../../../tmp/x`) — mirror
611 // the `query_file` traversal guard and reject it at config-load.
612 //
613 // Absolute paths are deliberately *not* rejected: `path: /output` is a
614 // legitimate Docker volume-mount pattern (see `examples/rivet.yaml`) and
615 // an explicit operator choice, not a hidden escape. The `..` climb is
616 // the unambiguous traversal footgun.
617 if export.destination.destination_type == DestinationType::Local {
618 for (field, value) in [
619 ("path", export.destination.path.as_deref()),
620 ("prefix", export.destination.prefix.as_deref()),
621 ] {
622 let Some(value) = value else { continue };
623 if std::path::Path::new(value)
624 .components()
625 .any(|c| c == std::path::Component::ParentDir)
626 {
627 anyhow::bail!(
628 "export '{}': local destination {} must not contain a '..' component: \
629 '{}' (a parent-dir climb writes outside the output tree).",
630 export.name,
631 field,
632 value
633 );
634 }
635 }
636 }
637
638 if export.destination.destination_type == DestinationType::S3 {
639 let ak = export.destination.access_key_env.is_some();
640 let sk = export.destination.secret_key_env.is_some();
641 if ak != sk {
642 anyhow::bail!(
643 "export '{}': S3 requires both access_key_env and secret_key_env, or neither (use default AWS credential chain)",
644 export.name
645 );
646 }
647 }
648
649 if export.destination.destination_type == DestinationType::Gcs
650 && export.destination.allow_anonymous
651 && export.destination.credentials_file.is_some()
652 {
653 anyhow::bail!(
654 "export '{}': GCS allow_anonymous cannot be used together with credentials_file",
655 export.name
656 );
657 }
658
659 if export.destination.destination_type == DestinationType::Azure {
660 let has_name = export.destination.account_name.is_some();
661 let has_key = export.destination.account_key_env.is_some();
662 let has_sas = export.destination.sas_token_env.is_some();
663 if export.destination.allow_anonymous {
664 if has_name || has_key || has_sas {
665 anyhow::bail!(
666 "export '{}': Azure allow_anonymous cannot be combined with account_name/account_key_env/sas_token_env",
667 export.name
668 );
669 }
670 } else if has_key && has_sas {
671 anyhow::bail!(
672 "export '{}': Azure account_key_env and sas_token_env are mutually exclusive — pick one auth mode",
673 export.name
674 );
675 } else if !has_name {
676 anyhow::bail!(
677 "export '{}': Azure requires account_name (plus account_key_env or sas_token_env), or allow_anonymous: true for Azurite",
678 export.name
679 );
680 } else if !has_key && !has_sas {
681 anyhow::bail!(
682 "export '{}': Azure requires account_key_env or sas_token_env (or allow_anonymous: true for Azurite)",
683 export.name
684 );
685 }
686 }
687
688 if let Some(cred_path) = &export.destination.credentials_file
689 && !std::path::Path::new(cred_path).exists()
690 {
691 anyhow::bail!(
692 "export '{}': credentials_file '{}' does not exist",
693 export.name,
694 cred_path
695 );
696 }
697
698 if let Some(ref size_str) = export.max_file_size {
699 parse_file_size(size_str).map_err(|_| {
700 anyhow::anyhow!(
701 "export '{}': invalid max_file_size '{}'",
702 export.name,
703 size_str
704 )
705 })?;
706 }
707
708 if let Some(level) = export.compression_level {
709 match export.compression {
710 CompressionType::Zstd => {
711 if !(1..=22).contains(&level) {
712 anyhow::bail!(
713 "export '{}': zstd compression_level must be 1..22, got {}",
714 export.name,
715 level
716 );
717 }
718 }
719 CompressionType::Gzip => {
720 if level > 10 {
721 anyhow::bail!(
722 "export '{}': gzip compression_level must be 0..10, got {}",
723 export.name,
724 level
725 );
726 }
727 }
728 _ => {
729 anyhow::bail!(
730 "export '{}': compression_level is only supported for zstd and gzip",
731 export.name
732 );
733 }
734 }
735 }
736
737 match export.mode {
738 ExportMode::Incremental => {
739 if export.cursor_column.is_none() {
740 anyhow::bail!(
741 "export '{}': incremental mode requires cursor_column",
742 export.name
743 );
744 }
745 match export.incremental_cursor_mode {
746 IncrementalCursorMode::Coalesce => {
747 if export.cursor_fallback_column.is_none() {
748 anyhow::bail!(
749 "export '{}': incremental_cursor_mode: coalesce requires cursor_fallback_column",
750 export.name
751 );
752 }
753 }
754 IncrementalCursorMode::SingleColumn => {
755 if export.cursor_fallback_column.is_some() {
756 anyhow::bail!(
757 "export '{}': cursor_fallback_column is only valid with incremental_cursor_mode: coalesce",
758 export.name
759 );
760 }
761 }
762 }
763 }
764 ExportMode::Chunked => {
765 // `chunk_column` is mandatory unless the user used the `table:`
766 // shortcut on a Postgres source — in that case it is auto-resolved
767 // from the table's single-integer PK at plan-build time (see
768 // `crate::plan::build::resolve_chunk_column`).
769 if export.chunk_column.is_none() && export.table.is_none() {
770 anyhow::bail!(
771 "export '{}': chunked mode needs a chunking strategy. Pick one:\n \
772 chunk_column: <int col> range chunks on an integer column (most common)\n \
773 chunk_by_key: <unique col> keyset pagination when there's no integer PK\n \
774 chunk_count: <N> split the range into N equal chunks\n \
775 chunk_by_days: <D> time-bucketed chunks (needs a date/timestamp column)\n \
776 Or use the `table:` shortcut on a single table — rivet auto-resolves the column from the primary key.",
777 export.name
778 );
779 }
780 // chunk_size == 0 would divide the range into zero-width
781 // slices and (before the saturating fix in generate_chunks)
782 // either infinite-loop or produce no progress. QA backlog
783 // Task 5.1.
784 if export.chunk_size == 0 {
785 anyhow::bail!(
786 "export '{}': chunked mode requires chunk_size >= 1 (got 0)",
787 export.name
788 );
789 }
790 // parallel == 0 means "spawn zero workers". Claiming tasks
791 // with no workers stalls the pipeline. QA backlog Task 5.1.
792 if export.parallel == 0 {
793 anyhow::bail!(
794 "export '{}': chunked mode requires parallel >= 1 (got 0)",
795 export.name
796 );
797 }
798 if let Some(0) = export.chunk_count {
799 crate::config_bail!(
800 crate::error::codes::CONFIG_CHUNK_COUNT_INVALID,
801 "export '{}': chunk_count must be >= 1",
802 export.name
803 );
804 }
805 if export.chunk_count.is_some() && export.chunk_dense {
806 anyhow::bail!(
807 "export '{}': chunk_count and chunk_dense are mutually exclusive. \
808 Use chunk_count for equal-sized chunks over a sparse key; \
809 use chunk_dense only when the key has no gaps.",
810 export.name
811 );
812 }
813 if export.chunk_count.is_some() && export.chunk_by_days.is_some() {
814 anyhow::bail!(
815 "export '{}': chunk_count and chunk_by_days are mutually exclusive. \
816 Use chunk_count: N to split an integer range into N chunks; \
817 use chunk_by_days: D to bucket a date/timestamp column by D-day windows.",
818 export.name
819 );
820 }
821 }
822 ExportMode::TimeWindow => {
823 if export.time_column.is_none() {
824 anyhow::bail!(
825 "export '{}': time_window mode requires time_column",
826 export.name
827 );
828 }
829 if export.days_window.is_none() {
830 anyhow::bail!(
831 "export '{}': time_window mode requires days_window",
832 export.name
833 );
834 }
835 }
836 ExportMode::Full => {}
837 ExportMode::Cdc => {
838 if export.table.is_none() {
839 anyhow::bail!(
840 "export '{}': cdc mode requires `table:` (the source table to capture)",
841 export.name
842 );
843 }
844 if export.query.is_some() || export.query_file.is_some() {
845 anyhow::bail!(
846 "export '{}': cdc mode reads the transaction log, not a query — \
847 remove query/query_file and use `table:`",
848 export.name
849 );
850 }
851 }
852 }
853
854 if export.chunk_dense && export.mode != ExportMode::Chunked {
855 anyhow::bail!(
856 "export '{}': chunk_dense is only valid with mode: chunked",
857 export.name
858 );
859 }
860
861 if export.cdc.is_some() && export.mode != ExportMode::Cdc {
862 anyhow::bail!(
863 "export '{}': a `cdc:` block is only valid with `mode: cdc`",
864 export.name
865 );
866 }
867
868 if let Some(days) = export.chunk_by_days {
869 if export.mode != ExportMode::Chunked {
870 anyhow::bail!(
871 "export '{}': chunk_by_days requires mode: chunked",
872 export.name
873 );
874 }
875 if export.chunk_dense {
876 anyhow::bail!(
877 "export '{}': chunk_by_days cannot be combined with chunk_dense",
878 export.name
879 );
880 }
881 if days == 0 {
882 crate::config_bail!(
883 crate::error::codes::CONFIG_CHUNK_BY_DAYS_INVALID,
884 "export '{}': chunk_by_days must be at least 1",
885 export.name
886 );
887 }
888 }
889 Ok(())
890 }
891}
892
893/// True when a single YAML line carries a mapping value (text after `key:`)
894/// that contains a `{` outside of any quotes — the unquoted-template-brace
895/// shape (`prefix: {partition}`, `path: {date}/out`).
896///
897/// Quote-aware so a properly quoted value (`prefix: "exports/{partition}/"`)
898/// does *not* match, and `$`-prefixed braces (`${VAR}` env placeholders) are
899/// ignored — they are resolved before the parse and are not the footgun.
900fn line_has_unquoted_brace_value(line: &str) -> bool {
901 // Whole-line comments never carry a value — skip before splitting.
902 if line.trim_start().starts_with('#') {
903 return false;
904 }
905 // Split key from value at the first `": "` / `":\t"` / trailing `:`.
906 // A YAML plain-key separator is a colon followed by whitespace or EOL.
907 let bytes = line.as_bytes();
908 let mut sep = None;
909 let mut i = 0;
910 while i < bytes.len() {
911 if bytes[i] == b':' && (i + 1 == bytes.len() || bytes[i + 1].is_ascii_whitespace()) {
912 sep = Some(i + 1);
913 break;
914 }
915 i += 1;
916 }
917 let Some(value_start) = sep else {
918 return false;
919 };
920 let value = line[value_start..].trim_start();
921 // A trailing `#` after the value starts an inline comment; an empty or
922 // comment-only value carries no brace to flag.
923 if value.is_empty() || value.starts_with('#') {
924 return false;
925 }
926
927 let mut in_single = false;
928 let mut in_double = false;
929 let vbytes = value.as_bytes();
930 for (j, &c) in vbytes.iter().enumerate() {
931 match c {
932 b'\'' if !in_double => in_single = !in_single,
933 b'"' if !in_single => in_double = !in_double,
934 b'{' if !in_single && !in_double => {
935 // Ignore `${...}` env placeholders (resolved pre-parse).
936 if j > 0 && vbytes[j - 1] == b'$' {
937 continue;
938 }
939 return true;
940 }
941 _ => {}
942 }
943 }
944 false
945}
946
947/// Extract the lower-cased host from a `scheme://host[:port][/path]` endpoint,
948/// or `None` when it does not look like a URL.
949///
950/// SecOps helper for the cloud-`endpoint` exfiltration guard (V2/V12): the host
951/// decides whether a custom endpoint is a local emulator (loopback) or a remote
952/// redirect target. We reject every non-loopback custom endpoint regardless of
953/// scheme (covering both the exfil and the cleartext-`http` gaps), so only the
954/// host is needed. We hand-parse rather than pull in a URL crate — the inputs
955/// are operator-typed endpoints, not arbitrary URIs. A bracketed IPv6 literal
956/// authority (`http://[::1]:9000`) keeps its address so it compares against the
957/// loopback list.
958fn endpoint_host(endpoint: &str) -> Option<String> {
959 let (scheme, rest) = endpoint.split_once("://")?;
960 if scheme.is_empty() {
961 return None;
962 }
963 // Authority ends at the first `/` (path), `?` (query), or `#` (fragment);
964 // any `user[:pass]@` userinfo head is dropped (host is after the last `@`).
965 let authority = rest
966 .split(['/', '?', '#'])
967 .next()
968 .unwrap_or("")
969 .rsplit('@')
970 .next()
971 .unwrap_or("");
972 let host = if let Some(stripped) = authority.strip_prefix('[') {
973 // Bracketed IPv6 literal: take up to the closing `]`.
974 stripped.split(']').next().unwrap_or("")
975 } else {
976 // host[:port] — strip the port suffix.
977 authority.split(':').next().unwrap_or("")
978 };
979 if host.is_empty() {
980 return None;
981 }
982 Some(host.to_ascii_lowercase())
983}
984
985/// True when `host` names the local machine — the legitimate cloud-emulator
986/// target (Minio / Azurite / fake-gcs on `127.0.0.1`). Anything else is a
987/// remote host and a potential exfiltration redirect.
988fn is_loopback_host(host: &str) -> bool {
989 // `localhost` is the only non-IP host that counts as loopback. Everything
990 // else must PARSE as an IP literal in the loopback range — a lexical
991 // `starts_with("127.")` would accept attacker-controlled DNS like
992 // `127.attacker.com` or `127.0.0.1.evil.com` (both resolve off-box), turning
993 // the credential-exfil gate into a bypass (V2/V12). Parse strictly: only a
994 // real `127.0.0.0/8` / `::1` address is loopback; a hostname is not.
995 if host == "localhost" {
996 return true;
997 }
998 // Tolerate a bracketed IPv6 literal (`[::1]`) in case a caller forwards one.
999 let h = host
1000 .strip_prefix('[')
1001 .and_then(|s| s.strip_suffix(']'))
1002 .unwrap_or(host);
1003 h.parse::<std::net::IpAddr>()
1004 .is_ok_and(|ip| ip.is_loopback())
1005}
1006
1007/// True when `name` is filename-safe: rejects path-traversal (`..`), absolute
1008/// or slash-bearing names (`/`, `\`), a leading `.` (hidden / current-dir), and
1009/// embedded NULs. `ExportConfig.name` is keyed into output paths and on-disk
1010/// state, so a `../../etc/x` or absolute name escapes the output tree (V5).
1011fn is_filename_safe_name(name: &str) -> bool {
1012 !name.is_empty()
1013 && !name.starts_with('.')
1014 && !name.contains('/')
1015 && !name.contains('\\')
1016 && !name.contains("..")
1017 && !name.contains('\0')
1018}
1019
1020#[cfg(test)]
1021mod tests;
1022
1023#[cfg(test)]
1024mod audit_csv_compression {
1025 //! Finding #10: `format: csv` + a compression codec is a silent no-op
1026 //! (the file stays uncompressed but the manifest records the codec). The
1027 //! combo must be rejected at config-validate time. These tests encode the
1028 //! new rule, so reverting the fix turns them red.
1029 use super::*;
1030
1031 fn yaml(format: &str, compression_line: &str) -> String {
1032 format!(
1033 "source:\n type: postgres\n url: \"postgresql://localhost/test\"\n\
1034 exports:\n - name: t\n query: \"SELECT 1\"\n format: {format}\n\
1035 {compression_line} destination:\n type: local\n path: ./out\n"
1036 )
1037 }
1038
1039 #[test]
1040 fn audit_csv_compression_is_rejected() {
1041 // csv + gzip → rejected, with an actionable message.
1042 let err = Config::from_yaml(&yaml("csv", " compression: gzip\n")).unwrap_err();
1043 let msg = format!("{err:#}");
1044 assert!(
1045 msg.contains("CSV output does not support compression") && msg.contains("gzip"),
1046 "csv+gzip must be rejected with an actionable message; got: {msg}"
1047 );
1048 assert!(
1049 msg.contains("parquet") && msg.contains("none"),
1050 "message must point to the real options (parquet / none); got: {msg}"
1051 );
1052
1053 // Guard the boundaries: parquet+gzip and csv+none still validate.
1054 Config::from_yaml(&yaml("parquet", " compression: gzip\n"))
1055 .expect("parquet+gzip must validate");
1056 Config::from_yaml(&yaml("csv", " compression: none\n")).expect("csv+none must validate");
1057 }
1058
1059 #[test]
1060 fn audit_csv_every_real_codec_is_rejected() {
1061 // Each non-None codec is a silent no-op for CSV — none may slip through.
1062 for codec in ["zstd", "snappy", "gzip", "lz4"] {
1063 let err = Config::from_yaml(&yaml("csv", &format!(" compression: {codec}\n")))
1064 .unwrap_err();
1065 let msg = format!("{err:#}");
1066 assert!(
1067 msg.contains("CSV output does not support compression") && msg.contains(codec),
1068 "csv+{codec} must be rejected; got: {msg}"
1069 );
1070 }
1071 }
1072
1073 #[test]
1074 fn audit_csv_compression_profile_is_rejected() {
1075 // A `compression_profile:` other than `none` resolves to a real codec,
1076 // so it is the same silent no-op for CSV.
1077 for profile in ["fast", "balanced", "compact"] {
1078 let err = Config::from_yaml(&yaml(
1079 "csv",
1080 &format!(" compression_profile: {profile}\n"),
1081 ))
1082 .unwrap_err();
1083 let msg = format!("{err:#}");
1084 assert!(
1085 msg.contains("CSV output does not support compression_profile")
1086 && msg.contains(profile),
1087 "csv+profile {profile} must be rejected; got: {msg}"
1088 );
1089 }
1090 // profile: none is a no-op request and is fine.
1091 Config::from_yaml(&yaml("csv", " compression_profile: none\n"))
1092 .expect("csv + compression_profile: none must validate");
1093 }
1094
1095 #[test]
1096 fn audit_csv_default_compression_still_validates() {
1097 // Regression guard: a bare `format: csv` (no explicit codec) must keep
1098 // validating. `CompressionType::default()` is `Zstd`, but the user did
1099 // not *ask* for it — only an explicit codec is a no-op request. This
1100 // pins that the fix scans for explicit intent, not the struct default
1101 // (which would break ~60 existing csv configs).
1102 Config::from_yaml(&yaml("csv", "")).expect("bare format: csv must validate");
1103 }
1104
1105 #[test]
1106 fn audit_compression_supported_predicate() {
1107 // `compression_supported` is re-exported via `pub use format::*`.
1108 // Parquet supports every codec; CSV supports only None.
1109 for ct in [
1110 CompressionType::Zstd,
1111 CompressionType::Snappy,
1112 CompressionType::Gzip,
1113 CompressionType::Lz4,
1114 CompressionType::None,
1115 ] {
1116 assert!(compression_supported(FormatType::Parquet, ct));
1117 }
1118 assert!(compression_supported(
1119 FormatType::Csv,
1120 CompressionType::None
1121 ));
1122 for ct in [
1123 CompressionType::Zstd,
1124 CompressionType::Snappy,
1125 CompressionType::Gzip,
1126 CompressionType::Lz4,
1127 ] {
1128 assert!(
1129 !compression_supported(FormatType::Csv, ct),
1130 "CSV must not claim to support {}",
1131 ct.label()
1132 );
1133 }
1134 }
1135}
1136
1137#[cfg(test)]
1138mod audit_unquoted_template_brace {
1139 //! yaml-hint: an unquoted `{partition}` (or `{date}`) in a path/prefix
1140 //! value trips serde_yaml_ng's flow-mapping parser with a cryptic message
1141 //! that gives no clue the brace needs quoting. Since `{partition}` is the
1142 //! required token for `partition_by`, this is a common copy-paste footgun.
1143 //! `Config::from_yaml` augments the parser error with a quoting hint; these
1144 //! tests pin that behavior (and guard that valid configs are untouched).
1145 use super::*;
1146
1147 /// A full, otherwise-valid config whose `prefix:` value is whatever the
1148 /// caller passes verbatim (quoted or not). Only the `prefix:` line varies,
1149 /// so any parse error is attributable to the brace under test.
1150 fn yaml_with_prefix(prefix_value: &str) -> String {
1151 format!(
1152 "source:\n\
1153 \x20 type: postgres\n\
1154 \x20 url: \"postgresql://localhost/test\"\n\
1155 exports:\n\
1156 \x20 - name: t\n\
1157 \x20 query: \"SELECT 1\"\n\
1158 \x20 format: parquet\n\
1159 \x20 partition_by: created_date\n\
1160 \x20 destination:\n\
1161 \x20 type: local\n\
1162 \x20 path: ./out\n\
1163 \x20 prefix: {prefix_value}\n"
1164 )
1165 }
1166
1167 const HINT_FRAGMENT: &str =
1168 "a YAML value containing { } (such as {partition} or {date}) must be quoted";
1169
1170 #[test]
1171 fn bare_partition_token_gets_quoting_hint() {
1172 // `prefix: {partition}` parses as a YAML map, so serde rejects it with
1173 // `invalid type: map, expected a string` — no clue it's a quoting bug.
1174 let err = Config::from_yaml(&yaml_with_prefix("{partition}")).unwrap_err();
1175 let msg = format!("{err:#}");
1176 assert!(
1177 msg.contains(HINT_FRAGMENT),
1178 "bare {{partition}} must carry the quoting hint; got: {msg}"
1179 );
1180 // The original parser detail (type + location) is preserved.
1181 assert!(
1182 msg.contains("invalid type: map") || msg.contains("line"),
1183 "the original parser error must be kept; got: {msg}"
1184 );
1185 }
1186
1187 #[test]
1188 fn trailing_text_after_brace_gets_quoting_hint() {
1189 // `prefix: {date}/{partition}/` runs the flow map into block context:
1190 // serde emits `did not find expected key ... while parsing a block
1191 // mapping`. Same footgun, different libyaml symptom.
1192 let err = Config::from_yaml(&yaml_with_prefix("{date}/{partition}/")).unwrap_err();
1193 let msg = format!("{err:#}");
1194 assert!(
1195 msg.contains(HINT_FRAGMENT),
1196 "{{date}}/{{partition}}/ must carry the quoting hint; got: {msg}"
1197 );
1198 }
1199
1200 #[test]
1201 fn unclosed_brace_gets_quoting_hint() {
1202 // `prefix: {partition` (unclosed) is the canonical flow-mapping scanner
1203 // error: `did not find expected ',' or '}' ... while parsing a flow
1204 // mapping`. The hint must still fire.
1205 let err = Config::from_yaml(&yaml_with_prefix("{partition")).unwrap_err();
1206 let msg = format!("{err:#}");
1207 assert!(
1208 msg.contains(HINT_FRAGMENT),
1209 "unclosed brace must carry the quoting hint; got: {msg}"
1210 );
1211 }
1212
1213 #[test]
1214 fn quoted_brace_value_loads_ok() {
1215 // The fix itself, applied: a properly quoted brace value parses and
1216 // validates. This is the guard that the hint never reaches a valid
1217 // config and the success path is unchanged.
1218 let cfg = Config::from_yaml(&yaml_with_prefix("\"exports/{partition}/\""))
1219 .expect("quoted {partition} prefix must load");
1220 assert_eq!(
1221 cfg.exports[0].destination.prefix.as_deref(),
1222 Some("exports/{partition}/")
1223 );
1224 }
1225
1226 #[test]
1227 fn config_without_braces_is_untouched() {
1228 // No brace anywhere: a plain valid config still loads, and an unrelated
1229 // YAML error elsewhere must not pick up a spurious quoting hint.
1230 Config::from_yaml(&yaml_with_prefix("exports/data/"))
1231 .expect("a brace-free prefix must load");
1232 }
1233
1234 // ── line_has_unquoted_brace_value() unit coverage ──────────────────────
1235
1236 #[test]
1237 fn unquoted_brace_value_is_detected() {
1238 assert!(line_has_unquoted_brace_value(" prefix: {partition}"));
1239 assert!(line_has_unquoted_brace_value(" path: {date}/out"));
1240 assert!(line_has_unquoted_brace_value("prefix: {partition")); // unclosed
1241 }
1242
1243 #[test]
1244 fn quoted_brace_value_is_not_flagged() {
1245 // Quotes around the value hide the brace from the scanner — not a bug.
1246 assert!(!line_has_unquoted_brace_value(
1247 " prefix: \"exports/{partition}/\""
1248 ));
1249 assert!(!line_has_unquoted_brace_value(" prefix: 'data/{date}/'"));
1250 }
1251
1252 #[test]
1253 fn env_placeholder_and_plain_values_are_not_flagged() {
1254 // `${VAR}` placeholders are resolved before the parse and are not the
1255 // footgun; plain brace-free values are obviously fine.
1256 assert!(!line_has_unquoted_brace_value(" url: ${DATABASE_URL}"));
1257 assert!(!line_has_unquoted_brace_value(" path: ./out"));
1258 assert!(!line_has_unquoted_brace_value(" # prefix: {partition}")); // comment
1259 assert!(!line_has_unquoted_brace_value(" prefix:")); // no value
1260 }
1261}
1262
1263#[cfg(test)]
1264mod sec_config_validation_regression {
1265 //! Regression edge-cases that pin the *compat boundaries* of the
1266 //! config-validation security fixes — the cases that distinguish a real
1267 //! attack from a legitimate loopback / dev-container / Docker pattern.
1268 //! These complement the RED tests in `sec_config_validation`: the RED
1269 //! tests assert the attack is rejected; these assert the fix stays narrow
1270 //! enough not to break local-dev usage (see CRITICAL COMPAT).
1271 use super::*;
1272
1273 /// A full, otherwise-valid config whose single export's `destination:`
1274 /// block is whatever the caller passes verbatim.
1275 fn yaml_with_destination(dest_block: &str) -> String {
1276 format!(
1277 "source:\n type: postgres\n url: \"postgresql://localhost/test\"\n\
1278 exports:\n - name: t\n query: \"SELECT 1\"\n format: parquet\n\
1279 {dest_block}"
1280 )
1281 }
1282
1283 // ── endpoint_host / is_loopback_host helpers ─────────────────────────────
1284
1285 #[test]
1286 fn endpoint_host_parses_forms() {
1287 assert_eq!(
1288 endpoint_host("https://attacker.example.com").as_deref(),
1289 Some("attacker.example.com")
1290 );
1291 // Port and path are stripped from the host.
1292 assert_eq!(
1293 endpoint_host("http://127.0.0.1:10000/devstoreaccount1").as_deref(),
1294 Some("127.0.0.1")
1295 );
1296 // userinfo head is dropped (host is after the last `@`).
1297 assert_eq!(
1298 endpoint_host("http://user:pass@127.0.0.1:9000").as_deref(),
1299 Some("127.0.0.1")
1300 );
1301 // Bracketed IPv6 literal keeps its address.
1302 assert_eq!(endpoint_host("http://[::1]:9000").as_deref(), Some("::1"));
1303 // Not a URL → None (treated as a non-loopback custom endpoint upstream).
1304 assert_eq!(endpoint_host("not-a-url"), None);
1305 assert_eq!(endpoint_host("://nohost"), None);
1306 }
1307
1308 #[test]
1309 fn loopback_host_classification() {
1310 for h in ["127.0.0.1", "127.0.0.53", "localhost", "::1"] {
1311 assert!(is_loopback_host(h), "{h} must be loopback");
1312 }
1313 for h in ["attacker.example.com", "evil.com", "10.0.0.1", "::2"] {
1314 assert!(!is_loopback_host(h), "{h} must be remote");
1315 }
1316 }
1317
1318 // ── V2/V12 endpoint: loopback accepted regardless of allow_anonymous ─────
1319
1320 #[test]
1321 fn loopback_endpoint_without_allow_anonymous_still_accepted() {
1322 // A loopback emulator endpoint with credentials (no allow_anonymous) is
1323 // the Minio-with-keys local-dev pattern and must stay accepted — the
1324 // exfil guard targets *remote* hosts, not localhost.
1325 let cfg = yaml_with_destination(
1326 " destination:\n type: s3\n bucket: b\n region: us-east-1\n\
1327 \x20 endpoint: http://127.0.0.1:9000\n access_key_env: AK\n secret_key_env: SK\n",
1328 );
1329 Config::from_yaml(&cfg).expect("loopback endpoint with creds must stay accepted");
1330 }
1331
1332 #[test]
1333 fn remote_https_endpoint_with_allow_anonymous_is_the_only_remote_escape() {
1334 // The documented escape hatch: an explicit anonymous (emulator) opt-in
1335 // permits a non-loopback endpoint (no credentials are sent). Without
1336 // allow_anonymous the same endpoint is rejected (covered by the RED
1337 // test); with it, accepted.
1338 let cfg = yaml_with_destination(
1339 " destination:\n type: gcs\n bucket: b\n\
1340 \x20 endpoint: https://emulator.example.com\n allow_anonymous: true\n",
1341 );
1342 Config::from_yaml(&cfg).expect("remote endpoint + allow_anonymous opt-in must be accepted");
1343 }
1344
1345 // ── V15 local path: absolute allowed (Docker mount), `..` rejected ───────
1346
1347 #[test]
1348 fn absolute_local_path_is_allowed() {
1349 // `path: /output` is a legitimate Docker volume-mount pattern
1350 // (examples/rivet.yaml) and must keep validating — only `..` climbs are
1351 // the traversal footgun.
1352 let cfg =
1353 yaml_with_destination(" destination:\n type: local\n path: /output\n");
1354 Config::from_yaml(&cfg).expect("absolute local path (Docker mount) must validate");
1355 }
1356
1357 #[test]
1358 fn dotdot_in_local_prefix_is_rejected() {
1359 // `prefix` is guarded the same as `path`.
1360 let cfg = yaml_with_destination(
1361 " destination:\n type: local\n path: ./out\n prefix: a/../b\n",
1362 );
1363 let err = Config::from_yaml(&cfg).unwrap_err();
1364 let msg = format!("{err:#}");
1365 assert!(
1366 msg.contains("prefix") && msg.contains(".."),
1367 "a '..' in the local prefix must be rejected naming prefix/..; got: {msg}"
1368 );
1369 }
1370
1371 // ── V13 TLS: explicit enforced mode + knob rejected; default-mode kept ───
1372
1373 #[test]
1374 fn tls_danger_knob_without_explicit_mode_still_accepted() {
1375 // The dev-container pattern `tls: { accept_invalid_certs: true }` against
1376 // a loopback self-signed cert (e.g. the MSSQL docker container) omits
1377 // `mode:` — there is no *explicit* mode to contradict, so it must keep
1378 // validating. The RED test rejects only the explicit-mode contradiction.
1379 let yaml = "source:\n type: mssql\n url: \"sqlserver://sa:pw@127.0.0.1:1433/db\"\n \
1380 tls:\n accept_invalid_certs: true\n\
1381 exports:\n - name: t\n query: \"SELECT 1\"\n format: parquet\n \
1382 destination:\n type: local\n path: ./out\n";
1383 Config::from_yaml(yaml)
1384 .expect("dev-container default-mode + accept_invalid_certs must stay accepted");
1385 }
1386
1387 #[test]
1388 fn tls_explicit_verify_ca_plus_invalid_hostnames_rejected() {
1389 // The hostname knob is flagged too, against any explicit enforced mode.
1390 let yaml = "source:\n type: postgres\n url: \"postgresql://localhost/test\"\n \
1391 tls:\n mode: verify-ca\n accept_invalid_hostnames: true\n\
1392 exports:\n - name: t\n query: \"SELECT 1\"\n format: parquet\n \
1393 destination:\n type: local\n path: ./out\n";
1394 let err = Config::from_yaml(yaml).unwrap_err();
1395 let msg = format!("{err:#}");
1396 assert!(
1397 msg.contains("accept_invalid_hostnames") && msg.contains("verify-ca"),
1398 "explicit verify-ca + accept_invalid_hostnames must be rejected; got: {msg}"
1399 );
1400 }
1401
1402 #[test]
1403 fn tls_explicit_disable_with_knob_is_not_flagged() {
1404 // `mode: disable` carries no verification promise to contradict, so the
1405 // danger knob is a no-op there and must not be rejected.
1406 let yaml = "source:\n type: postgres\n url: \"postgresql://localhost/test\"\n \
1407 tls:\n mode: disable\n accept_invalid_certs: true\n\
1408 exports:\n - name: t\n query: \"SELECT 1\"\n format: parquet\n \
1409 destination:\n type: local\n path: ./out\n";
1410 Config::from_yaml(yaml).expect("mode: disable + knob is a no-op and must validate");
1411 }
1412
1413 // ── V5 name: filename-safe predicate boundaries ──────────────────────────
1414
1415 #[test]
1416 fn filename_safe_name_boundaries() {
1417 for ok in ["t", "orders", "daily_events", "v2-2024", "name.with.dots"] {
1418 assert!(is_filename_safe_name(ok), "{ok:?} must be accepted");
1419 }
1420 for bad in [
1421 "",
1422 "..",
1423 "../x",
1424 "/abs",
1425 "sub/dir",
1426 "back\\slash",
1427 ".hidden",
1428 "with\u{0000}nul",
1429 ] {
1430 assert!(!is_filename_safe_name(bad), "{bad:?} must be rejected");
1431 }
1432 }
1433}
1434
1435#[cfg(test)]
1436mod sec_config_validation {
1437 //! RED security tests for config-load validation gaps (cluster:
1438 //! config-validation). Each asserts the SECURE behavior through the
1439 //! stable `Config::from_yaml` seam: a malicious config that is accepted
1440 //! today must be REJECTED (or, for warn-only knobs, surfaced as an
1441 //! error/loud warning) at config-load. These are expected to FAIL until
1442 //! the corresponding production fix lands.
1443 //!
1444 //! The pattern mirrors the existing `query_file` `..`/absolute-path guard
1445 //! in `validate_export` (see `config/tests/validation.rs`): a syntactic
1446 //! check that runs at config-validate time so `rivet check` / `rivet
1447 //! doctor` catch the problem before any connect/plan/upload step.
1448 use super::*;
1449
1450 /// A full, otherwise-valid config whose single export's `destination:`
1451 /// block is whatever the caller passes verbatim. Only the destination
1452 /// varies, so any rejection is attributable to the destination under test.
1453 fn yaml_with_destination(dest_block: &str) -> String {
1454 format!(
1455 "source:\n type: postgres\n url: \"postgresql://localhost/test\"\n\
1456 exports:\n - name: t\n query: \"SELECT 1\"\n format: parquet\n\
1457 {dest_block}"
1458 )
1459 }
1460
1461 // ── V2/V12: cloud-endpoint exfiltration + http cleartext ────────────────
1462 //
1463 // `destination.endpoint` is passed straight to the opendal S3/GCS/Azure
1464 // builder with no validation (see `src/destination/{s3,gcs,azure}.rs`),
1465 // so a committed config can silently redirect every export to an
1466 // attacker-controlled host. Two distinct gaps:
1467 // V2 — a custom *non-loopback* endpoint (data exfiltration target).
1468 // V12 — an `http://` (plaintext) endpoint (credentials + data on the
1469 // wire in cleartext).
1470 // The secure behavior is to reject (or require explicit opt-in) at
1471 // config-load. Loopback/emulator endpoints (Minio/Azurite/fake-gcs on
1472 // 127.0.0.1) MUST stay accepted — that path is exercised by the existing
1473 // `gcs_allow_anonymous_parses` test and the guard test below.
1474
1475 #[test]
1476 fn sec_s3_custom_endpoint_rejected() {
1477 // SEC-RED V2: a non-loopback custom S3 endpoint is an exfiltration
1478 // target — every part upload goes to attacker.example.com. Must be
1479 // rejected (or require explicit opt-in) at config-load. Accepted today.
1480 let cfg = yaml_with_destination(
1481 " destination:\n type: s3\n bucket: my-bucket\n region: us-east-1\n\
1482 \x20 endpoint: https://attacker.example.com\n",
1483 );
1484 let res = Config::from_yaml(&cfg);
1485 assert!(
1486 res.is_err(),
1487 "a non-loopback custom S3 endpoint (https://attacker.example.com) must be \
1488 rejected at config-load (data-exfiltration target); got Ok"
1489 );
1490 let msg = format!("{:#}", res.unwrap_err());
1491 assert!(
1492 msg.contains("endpoint"),
1493 "rejection must name the offending 'endpoint' field; got: {msg}"
1494 );
1495 }
1496
1497 #[test]
1498 fn sec_http_endpoint_rejected() {
1499 // SEC-RED V12: a plaintext http:// endpoint to a *remote* host sends
1500 // credentials and exported rows over the wire in cleartext. Must be
1501 // rejected (or require explicit opt-in) at config-load. Accepted today.
1502 // Use a non-loopback host so this is distinct from the Minio/Azurite
1503 // loopback emulator case (guarded below).
1504 let cfg = yaml_with_destination(
1505 " destination:\n type: s3\n bucket: my-bucket\n region: us-east-1\n\
1506 \x20 endpoint: http://evil.com\n",
1507 );
1508 let res = Config::from_yaml(&cfg);
1509 assert!(
1510 res.is_err(),
1511 "a plaintext http:// endpoint to a remote host (http://evil.com) must be \
1512 rejected at config-load (cleartext credentials + data); got Ok"
1513 );
1514 let msg = format!("{:#}", res.unwrap_err());
1515 assert!(
1516 msg.contains("endpoint") || msg.to_lowercase().contains("http"),
1517 "rejection must name the endpoint / cleartext problem; got: {msg}"
1518 );
1519 }
1520
1521 #[test]
1522 fn sec_loopback_endpoint_still_accepted_guard() {
1523 // SEC-RED V2/V12 (guard): a loopback emulator endpoint
1524 // (`http://127.0.0.1:9000` Minio, with allow_anonymous) is the
1525 // legitimate local-dev path and MUST stay accepted after the fix.
1526 // This pins that the endpoint rejection targets *remote* hosts, not
1527 // localhost — otherwise the fix breaks every Minio/Azurite/fake-gcs
1528 // integration test (see `gcs_allow_anonymous_parses`).
1529 let cfg = yaml_with_destination(
1530 " destination:\n type: s3\n bucket: my-bucket\n region: us-east-1\n\
1531 \x20 endpoint: http://127.0.0.1:9000\n allow_anonymous: true\n",
1532 );
1533 Config::from_yaml(&cfg)
1534 .expect("a loopback emulator endpoint with allow_anonymous must stay accepted");
1535 }
1536
1537 // ── V5: export `name` path traversal ────────────────────────────────────
1538 //
1539 // `ExportConfig.name` is a free-form `String` keyed into state tracking,
1540 // file logs, and (via the destination layout) output paths — yet it is
1541 // never validated. A name like `../../../etc/x`, an absolute `/abs/x`, a
1542 // bare slash, or an embedded NUL can escape the intended output tree.
1543 // Mirror the `query_file` `..`/absolute guard: reject at config-load.
1544
1545 #[test]
1546 fn sec_export_name_traversal_rejected() {
1547 // SEC-RED V5: a traversal / absolute / slash / NUL export name escapes
1548 // the output tree (and corrupts name-keyed state). Must be rejected at
1549 // config-load. Accepted today.
1550 for bad in ["../../../etc/x", "/abs/x", "sub/dir", "with\u{0000}nul"] {
1551 // `name:` is JSON-encoded so embedded slashes / NULs survive the
1552 // YAML parse verbatim and reach validation.
1553 let name_yaml = serde_json::to_string(bad).expect("encode name");
1554 let cfg = format!(
1555 "source:\n type: postgres\n url: \"postgresql://localhost/test\"\n\
1556 exports:\n - name: {name_yaml}\n query: \"SELECT 1\"\n format: parquet\n\
1557 \x20 destination:\n type: local\n path: ./out\n"
1558 );
1559 let res = Config::from_yaml(&cfg);
1560 assert!(
1561 res.is_err(),
1562 "export name {bad:?} (traversal/absolute/slash/NUL) must be rejected at \
1563 config-load; got Ok"
1564 );
1565 let msg = format!("{:#}", res.unwrap_err());
1566 assert!(
1567 msg.contains("name"),
1568 "rejection of name {bad:?} must name the offending 'name' field; got: {msg}"
1569 );
1570 }
1571 }
1572
1573 #[test]
1574 fn sec_export_name_normal_still_accepted_guard() {
1575 // SEC-RED V5 (guard): a plain, well-formed export name must keep
1576 // loading after the fix. Pins that the traversal check is narrow.
1577 let cfg = yaml_with_destination(" destination:\n type: local\n path: ./out\n");
1578 Config::from_yaml(&cfg).expect("a normal export name ('t') must stay accepted");
1579 }
1580
1581 // ── V15: local destination `path` traversal ─────────────────────────────
1582 //
1583 // `destination.path` for a `type: local` export is written verbatim to the
1584 // filesystem. A relative `../../../../tmp/x` or absolute path lets a
1585 // committed config write outside the intended output directory. Must be
1586 // rejected (or at minimum loudly surfaced) at config-load. Accepted today.
1587
1588 #[test]
1589 fn sec_local_dest_path_traversal_rejected() {
1590 // SEC-RED V15: a traversal local-destination path writes outside the
1591 // intended output tree. Must be rejected at config-load. Accepted today.
1592 let cfg = yaml_with_destination(
1593 " destination:\n type: local\n path: ../../../../tmp/x\n",
1594 );
1595 let res = Config::from_yaml(&cfg);
1596 assert!(
1597 res.is_err(),
1598 "a local destination path containing '..' (../../../../tmp/x) must be rejected \
1599 at config-load (writes outside the output tree); got Ok"
1600 );
1601 let msg = format!("{:#}", res.unwrap_err());
1602 assert!(
1603 msg.contains("path") || msg.contains(".."),
1604 "rejection must name the offending 'path' / traversal; got: {msg}"
1605 );
1606 }
1607
1608 // ── V13: dangerous TLS cert-knob combination ─────────────────────────────
1609 //
1610 // `tls: { mode: verify-full, accept_invalid_certs: true }` silently
1611 // *downgrades* the strongest mode to "accept any cert" — `verify-full`
1612 // promises chain + hostname verification, but the danger knob disables
1613 // chain verification (see `src/source/tls.rs::build_native_tls`). The
1614 // comment at `src/source/tls.rs:55-56` claims "Each one emits a warning at
1615 // config-time (see `Config::validate`)" — but `Config::validate` emits no
1616 // such warning today. The secure behavior is a LOUD error (or surfaced
1617 // warning) at config-load. No `Err`/warning is produced today, so this is
1618 // RED.
1619
1620 #[test]
1621 fn sec_accept_invalid_certs_warns() {
1622 // SEC-RED V13: verify-full + accept_invalid_certs: true is a silent
1623 // security downgrade that contradicts the chosen mode. It must be
1624 // loudly surfaced at config-load. The only stable secure seam is an
1625 // `Err` from `Config::from_yaml` (validate returns Ok today, and there
1626 // is no captured-warning seam exposed from here — see notes). Asserting
1627 // `Err` is the strongest secure assertion and is RED against current
1628 // code.
1629 let cfg = yaml_with_destination(" destination:\n type: local\n path: ./out\n");
1630 // Splice the TLS block into the source rather than the destination so
1631 // the rest of the config stays valid.
1632 let cfg = cfg.replace(
1633 " url: \"postgresql://localhost/test\"\n",
1634 " url: \"postgresql://localhost/test\"\n tls:\n mode: verify-full\n accept_invalid_certs: true\n",
1635 );
1636 let res = Config::from_yaml(&cfg);
1637 assert!(
1638 res.is_err(),
1639 "tls mode: verify-full with accept_invalid_certs: true is a silent security \
1640 downgrade and must be loudly surfaced (error) at config-load; got Ok"
1641 );
1642 let msg = format!("{:#}", res.unwrap_err());
1643 assert!(
1644 msg.contains("accept_invalid_certs") || msg.to_lowercase().contains("verify"),
1645 "the surfaced error must name the dangerous knob / mode contradiction; got: {msg}"
1646 );
1647 }
1648}