Skip to main content

rivet/config/
source.rs

1//! Source-database connection config: URL/structured fields, TLS, environment hints.
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6use super::resolve::resolve_env_vars;
7use crate::tuning::{TuningConfig, TuningProfile};
8
9#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
10#[serde(deny_unknown_fields)]
11pub struct SourceConfig {
12    #[serde(rename = "type")]
13    pub source_type: SourceType,
14
15    pub url: Option<String>,
16    pub url_env: Option<String>,
17    pub url_file: Option<String>,
18
19    pub host: Option<String>,
20    pub port: Option<u16>,
21    pub user: Option<String>,
22    pub password: Option<String>,
23    pub password_env: Option<String>,
24    pub database: Option<String>,
25
26    /// Operational profile of the source database.
27    ///
28    /// Selects the **default** tuning profile when none is explicitly set in
29    /// `source.tuning.profile` or `export.tuning.profile`:
30    ///
31    /// | `environment`           | default profile |
32    /// |-------------------------|------------------|
33    /// | `production` (default)  | `balanced` (50 ms throttle, 10 k batch, retries) |
34    /// | `replica`               | `balanced` |
35    /// | `local`                 | `fast` (no throttle, 50 k batch — saves ~30% wall on localhost) |
36    ///
37    /// Explicit `tuning.profile:` always wins over this hint.
38    #[serde(default)]
39    pub environment: Option<SourceEnvironment>,
40
41    #[serde(default)]
42    pub tuning: Option<TuningConfig>,
43
44    /// Transport security settings (ADR: SecOps). When absent, Rivet connects
45    /// without TLS — a warning is emitted so operators are aware. See [`TlsConfig`].
46    #[serde(default)]
47    pub tls: Option<TlsConfig>,
48}
49
50/// Operational environment of the source database — drives the default tuning
51/// profile when none is explicitly set. Opt-in: existing configs without
52/// `environment:` continue to use `balanced` as today.
53#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
54#[serde(rename_all = "lowercase")]
55pub enum SourceEnvironment {
56    /// Localhost / Docker compose / read-only container — no throttle by default
57    /// (compiles to `fast` profile defaults). Use when DB load is not a concern.
58    Local,
59    /// Read replica — `balanced` default. Same throttle as production, but free
60    /// to dial up `tuning.batch_size`.
61    Replica,
62    /// Live production primary — `balanced` default. Bias toward source-safety.
63    Production,
64}
65
66impl SourceEnvironment {
67    /// Default tuning profile selected by this environment when the user has
68    /// not set `tuning.profile:` explicitly.
69    pub fn default_profile(self) -> TuningProfile {
70        match self {
71            SourceEnvironment::Local => TuningProfile::Fast,
72            SourceEnvironment::Replica | SourceEnvironment::Production => TuningProfile::Balanced,
73        }
74    }
75}
76
77/// Transport security for the source database connection.
78///
79/// Credentials and exported data cross the wire on every connection; without TLS
80/// they are visible to anyone on the network path (cloud inter-VPC, cross-AZ, or
81/// a compromised upstream). The default for all new connections is
82/// [`TlsMode::Require`] when `tls:` is present; setting `tls: { mode: disable }`
83/// is explicit opt-out.
84///
85/// ```yaml
86/// source:
87///   type: postgres
88///   url_env: DATABASE_URL
89///   tls:
90///     mode: verify-full
91///     ca_file: /etc/ssl/certs/rds-ca-2019-root.pem
92/// ```
93#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
94#[serde(deny_unknown_fields)]
95pub struct TlsConfig {
96    /// Enforcement level. See [`TlsMode`].
97    #[serde(default)]
98    pub mode: TlsMode,
99    /// PEM-encoded CA certificate to trust for server verification. Required
100    /// for [`TlsMode::VerifyCa`] and [`TlsMode::VerifyFull`] against a private CA.
101    pub ca_file: Option<String>,
102    /// Accept certificates not chained to a trusted CA. Dangerous — disables
103    /// server authentication — and only honored when explicitly `true`.
104    #[serde(default)]
105    pub accept_invalid_certs: bool,
106    /// Accept certificates whose subjectAltName does not match the connection
107    /// hostname. Dangerous — disables hostname verification.
108    #[serde(default)]
109    pub accept_invalid_hostnames: bool,
110}
111
112/// TLS enforcement mode, mirroring libpq's `sslmode` semantics where possible.
113#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
114#[serde(rename_all = "kebab-case")]
115pub enum TlsMode {
116    /// Plaintext. Use only inside trusted networks (loopback, cgroup-private).
117    Disable,
118    /// Require a TLS handshake; accept the server certificate without verifying
119    /// issuer or hostname. Protects against passive sniffing, not MITM.
120    Require,
121    /// TLS + verify certificate chains to the configured / system trust store.
122    /// Does not check hostname (useful for IP-addressed or internal names).
123    VerifyCa,
124    /// TLS + verify chain **and** hostname against the server cert's SAN/CN.
125    /// Recommended default for production.
126    #[default]
127    VerifyFull,
128}
129
130impl TlsMode {
131    pub fn is_enforced(self) -> bool {
132        !matches!(self, TlsMode::Disable)
133    }
134}
135
136impl SourceConfig {
137    /// Return a copy of this config with **all plaintext credential material stripped**,
138    /// safe to embed in a persisted [`crate::plan::PlanArtifact`] (ADR-0005 PA9).
139    ///
140    /// Redaction rules:
141    /// - `password` → always `None` (plaintext password never leaves the process).
142    /// - `url` containing `user[:password]@` → userinfo segment replaced with `"REDACTED"`.
143    /// - `url_env`, `url_file`, `password_env` — kept (env var **names** and file paths
144    ///   are references, not secrets; `apply` needs them to re-resolve credentials).
145    /// - `host`, `port`, `user`, `database` — kept (structured connection metadata).
146    ///
147    /// If a plaintext `password` or `url` is redacted, callers should surface a warning
148    /// to the operator so env/file-based auth is available at apply time.
149    pub fn redact_for_artifact(&self) -> (Self, bool) {
150        let mut out = self.clone();
151        let mut redacted = false;
152
153        if out.password.is_some() {
154            out.password = None;
155            redacted = true;
156        }
157
158        if let Some(ref raw) = out.url
159            && let Some((userinfo_end, scheme_end)) = find_userinfo(raw)
160        {
161            let mut s = String::with_capacity(raw.len());
162            s.push_str(&raw[..scheme_end]); // "postgresql://"
163            s.push_str("REDACTED");
164            s.push_str(&raw[userinfo_end..]); // "@host:port/db…"
165            out.url = Some(s);
166            redacted = true;
167        }
168
169        (out, redacted)
170    }
171
172    pub(crate) fn has_structured_fields(&self) -> bool {
173        self.host.is_some()
174            || self.user.is_some()
175            || self.database.is_some()
176            || self.password.is_some()
177            || self.password_env.is_some()
178    }
179
180    pub(crate) fn has_url_fields(&self) -> bool {
181        self.url.is_some() || self.url_env.is_some() || self.url_file.is_some()
182    }
183
184    fn build_url_from_fields(&self) -> crate::error::Result<String> {
185        // First-user-friendly errors: name the missing field, suggest a
186        // concrete value, and remind the operator that `url_env` is the
187        // alternative path so they don't bounce.  See
188        // `docs/getting-started.md` for the full onboarding flow.
189        let host = self.host.as_deref().ok_or_else(|| {
190            anyhow::anyhow!(
191                "source: structured config is missing 'host'.\n  Hint: add `host: localhost` (or your DB host) under `source:` in rivet.yaml.\n  Or switch to URL-based config: `url_env: DATABASE_URL`."
192            )
193        })?;
194        let user = self.user.as_deref().ok_or_else(|| {
195            anyhow::anyhow!(
196                "source: structured config is missing 'user'.\n  Hint: add `user: <username>` under `source:` in rivet.yaml."
197            )
198        })?;
199        let database = self.database.as_deref().ok_or_else(|| {
200            anyhow::anyhow!(
201                "source: structured config is missing 'database'.\n  Hint: add `database: <dbname>` under `source:` in rivet.yaml."
202            )
203        })?;
204
205        // SecOps: keep the plaintext password inside a `Zeroizing<String>` until it
206        // is spliced into the final URL, so the standalone password buffer is
207        // wiped on drop (the final URL still lives as a plain String but is
208        // shorter-lived and dropped by the driver constructor).
209        let password: zeroize::Zeroizing<String> =
210            zeroize::Zeroizing::new(match (&self.password, &self.password_env) {
211                (Some(_), Some(_)) => {
212                    anyhow::bail!("source: specify 'password' or 'password_env', not both");
213                }
214                (Some(p), None) => {
215                    static WARNED: std::sync::Once = std::sync::Once::new();
216                    WARNED.call_once(|| {
217                        log::warn!(
218                            "source config contains plaintext password -- consider using password_env"
219                        );
220                    });
221                    resolve_env_vars(p)?
222                }
223                (None, Some(env)) => std::env::var(env).map_err(|_| {
224                    anyhow::anyhow!(
225                        "source: env var '{0}' is not set (referenced by password_env).\n  Hint: export the value before running, e.g.\n      export {0}='your-database-password'",
226                        env
227                    )
228                })?,
229                (None, None) => String::new(),
230            });
231
232        let default_port = match self.source_type {
233            SourceType::Postgres => 5432,
234            SourceType::Mysql => 3306,
235            SourceType::Mssql => 1433,
236        };
237        let port = self.port.unwrap_or(default_port);
238
239        let scheme = match self.source_type {
240            SourceType::Postgres => "postgresql",
241            SourceType::Mysql => "mysql",
242            SourceType::Mssql => "sqlserver",
243        };
244
245        if password.is_empty() {
246            Ok(format!(
247                "{}://{}@{}:{}/{}",
248                scheme, user, host, port, database
249            ))
250        } else {
251            Ok(format!(
252                "{}://{}:{}@{}:{}/{}",
253                scheme,
254                user,
255                password.as_str(),
256                host,
257                port,
258                database
259            ))
260        }
261    }
262
263    pub fn resolve_url(&self) -> crate::error::Result<String> {
264        if self.has_url_fields() && self.has_structured_fields() {
265            anyhow::bail!(
266                "source: pick either URL-based config (url/url_env/url_file) OR structured fields (host/user/database/port/password_env), not both.\n  Hint: remove whichever block you don't want; mixing the two is ambiguous."
267            );
268        }
269
270        if self.has_structured_fields() {
271            return self.build_url_from_fields();
272        }
273
274        // Capture *where* the URL came from so the password warning below
275        // can be specific: scolding an operator who already used
276        // `url_env:` (the recommendation!) for "considering url_env" is
277        // misleading and trains them to tune out our warnings.
278        //
279        // The `EnvVar(&str)` / `File(&str)` payloads are retained for
280        // future use (e.g. mentioning the env-var name in a richer
281        // diagnostic later) — `#[allow(dead_code)]` keeps clippy quiet
282        // while we keep the slot open. Renaming the variants to unit
283        // would lose the documentation that "this came from <name>".
284        #[allow(dead_code)]
285        enum UrlSource<'a> {
286            InlineYaml,
287            EnvVar(&'a str),
288            File(&'a str),
289        }
290        let (raw, source) = match (&self.url, &self.url_env, &self.url_file) {
291            (Some(u), None, None) => (u.clone(), UrlSource::InlineYaml),
292            (None, Some(env), None) => (
293                std::env::var(env).map_err(|_| {
294                    anyhow::anyhow!(
295                        "source: env var '{0}' is not set (referenced by url_env).\n  Hint: export the value before running, e.g.\n      export {0}='postgresql://user:pass@host:5432/dbname'\n  Or change `url_env: {0}` in your config to a different env var name.",
296                        env
297                    )
298                })?,
299                UrlSource::EnvVar(env),
300            ),
301            (None, None, Some(file)) => (
302                std::fs::read_to_string(file)
303                    .map_err(|e| {
304                        anyhow::anyhow!(
305                            "source: cannot read url_file '{}': {}.\n  Hint: ensure the file exists and is readable; the file should contain only the URL on a single line.",
306                            file,
307                            e
308                        )
309                    })?
310                    .trim()
311                    .to_string(),
312                UrlSource::File(file),
313            ),
314            _ => anyhow::bail!(
315                "source: configure exactly one connection method:\n  url_env: DATABASE_URL                          (URL from env var — recommended)\n  url: 'postgresql://user:pass@host:5432/db'      (inline — not recommended for committed configs)\n  url_file: /etc/rivet/source.url                 (URL from file — rotation-friendly)\n  host/user/database/...                          (structured fields under `source:`)"
316            ),
317        };
318
319        let resolved = resolve_env_vars(&raw)?;
320
321        if resolved.contains('@')
322            && resolved.contains(':')
323            && let Some(userinfo) = resolved.split('@').next()
324            && userinfo.contains(':')
325            && !userinfo.ends_with(':')
326        {
327            // `resolve_url` is called from many places per run (plan build,
328            // doctor, every export, every chunk worker). Fire each variant
329            // of this warning exactly once per process so operators see
330            // one clean nudge, not 3-4 stacked copies in stderr.
331            //
332            // Only the InlineYaml case is a real misconfiguration to flag:
333            // the password is sitting in a committed file. EnvVar / File
334            // sources are explicitly the recommended forms — scolding an
335            // operator who already uses them for "considering url_env"
336            // would be a false alarm.
337            match source {
338                UrlSource::InlineYaml => {
339                    static WARNED: std::sync::Once = std::sync::Once::new();
340                    WARNED.call_once(|| {
341                        log::warn!(
342                            "source: inline `url:` in YAML contains a plaintext password — \
343                             move it to `url_env: DATABASE_URL` (or `url_file:`) to keep \
344                             credentials out of committed configs"
345                        );
346                    });
347                }
348                UrlSource::EnvVar(_) | UrlSource::File(_) => {
349                    // The recommended forms — no warning. Operator hygiene
350                    // for shell history / file permissions is out of scope.
351                }
352            }
353        }
354
355        Ok(resolved)
356    }
357}
358
359#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
360#[serde(rename_all = "lowercase")]
361pub enum SourceType {
362    Postgres,
363    Mysql,
364    Mssql,
365}
366
367/// Locate `user[:password]@` userinfo inside a standard URL.
368///
369/// Returns `(userinfo_end_index, scheme_end_index)` where:
370/// - `scheme_end_index` points right after `"://"` (start of userinfo)
371/// - `userinfo_end_index` points at the `@` separator (exclusive of `@`)
372///
373/// Returns `None` if the URL has no userinfo segment.
374fn find_userinfo(raw: &str) -> Option<(usize, usize)> {
375    let scheme = raw.find("://")? + 3;
376    let rest = &raw[scheme..];
377    // The authority ends at the first path/query/fragment delimiter; an `@`
378    // after that belongs to the path or query (`?foo=a@b`), not the userinfo.
379    let authority_end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
380    // Terminate userinfo at the LAST `@` within the authority: a password may
381    // itself contain `@` (`user:p@ssw0rd@host`), and splitting at the FIRST
382    // `@` would leak the tail after it into the persisted plan artifact.
383    // `rfind` mirrors `redact_pg_url` in state/mod.rs, which strips passwords
384    // the same way for the same reason.
385    let at = rest[..authority_end].rfind('@')?;
386    Some((scheme + at, scheme))
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392
393    // ── TlsMode::is_enforced ────────────────────────────────────────────────
394
395    #[test]
396    fn tls_mode_disable_not_enforced() {
397        assert!(!TlsMode::Disable.is_enforced());
398    }
399
400    #[test]
401    fn tls_mode_require_is_enforced() {
402        assert!(TlsMode::Require.is_enforced());
403        assert!(TlsMode::VerifyCa.is_enforced());
404        assert!(TlsMode::VerifyFull.is_enforced());
405    }
406
407    // ── SourceConfig::redact_for_artifact ───────────────────────────────────
408
409    fn make_source(source_type: SourceType) -> SourceConfig {
410        SourceConfig {
411            source_type,
412            url: None,
413            url_env: None,
414            url_file: None,
415            host: None,
416            port: None,
417            user: None,
418            password: None,
419            password_env: None,
420            database: None,
421            environment: None,
422            tuning: None,
423            tls: None,
424        }
425    }
426
427    #[test]
428    fn redact_plaintext_password() {
429        let mut src = make_source(SourceType::Postgres);
430        src.password = Some("s3cr3t".into());
431        let (redacted, flag) = src.redact_for_artifact();
432        assert!(flag, "redaction should be flagged");
433        assert!(
434            redacted.password.is_none(),
435            "plaintext password must be stripped"
436        );
437    }
438
439    #[test]
440    fn redact_url_with_password() {
441        let mut src = make_source(SourceType::Postgres);
442        src.url = Some("postgresql://user:hunter2@db.example.com:5432/app".into());
443        let (redacted, flag) = src.redact_for_artifact();
444        assert!(flag, "URL redaction flagged");
445        let url = redacted.url.unwrap();
446        assert!(!url.contains("hunter2"), "password must not appear: {url}");
447        assert!(url.contains("REDACTED"), "placeholder must appear: {url}");
448        assert!(url.contains("@db.example.com"), "host retained: {url}");
449    }
450
451    #[test]
452    fn redact_url_without_at_sign_not_flagged() {
453        let mut src = make_source(SourceType::Postgres);
454        src.url = Some("postgresql://db.example.com:5432/app".into());
455        let (_, flag) = src.redact_for_artifact();
456        assert!(!flag, "URL with no userinfo must not be flagged");
457    }
458
459    #[test]
460    fn redact_url_with_user_but_no_password_is_flagged() {
461        let mut src = make_source(SourceType::Postgres);
462        src.url = Some("postgresql://user@db.example.com:5432/app".into());
463        let (redacted, flag) = src.redact_for_artifact();
464        assert!(flag, "bare user@ is still userinfo and gets redacted");
465        let url = redacted.url.unwrap();
466        assert!(url.contains("REDACTED"), "userinfo replaced: {url}");
467        assert!(!url.contains("user@"), "bare username removed: {url}");
468    }
469
470    #[test]
471    fn redact_env_var_reference_kept_intact() {
472        let mut src = make_source(SourceType::Mysql);
473        src.url_env = Some("DB_URL".into());
474        src.password_env = Some("DB_PASS".into());
475        let (redacted, flag) = src.redact_for_artifact();
476        assert!(!flag, "env var references are not secrets");
477        assert_eq!(redacted.url_env.as_deref(), Some("DB_URL"));
478        assert_eq!(redacted.password_env.as_deref(), Some("DB_PASS"));
479    }
480
481    #[test]
482    fn redact_mysql_url_with_password() {
483        let mut src = make_source(SourceType::Mysql);
484        src.url = Some("mysql://root:pass@127.0.0.1:3306/mydb".into());
485        let (redacted, flag) = src.redact_for_artifact();
486        assert!(flag);
487        let url = redacted.url.unwrap();
488        assert!(url.contains("REDACTED"), "{url}");
489        assert!(!url.contains("pass"), "{url}");
490    }
491
492    // ── SourceConfig::resolve_url (structured fields) ───────────────────────
493
494    #[test]
495    fn resolve_url_from_structured_fields_postgres() {
496        let mut src = make_source(SourceType::Postgres);
497        src.host = Some("pg.internal".into());
498        src.user = Some("alice".into());
499        src.database = Some("warehouse".into());
500        src.port = Some(5433);
501        let url = src.resolve_url().unwrap();
502        assert_eq!(url, "postgresql://alice@pg.internal:5433/warehouse");
503    }
504
505    #[test]
506    fn resolve_url_from_structured_fields_defaults_port() {
507        let mut src = make_source(SourceType::Mysql);
508        src.host = Some("my.internal".into());
509        src.user = Some("bob".into());
510        src.database = Some("orders".into());
511        let url = src.resolve_url().unwrap();
512        assert_eq!(url, "mysql://bob@my.internal:3306/orders");
513    }
514
515    #[test]
516    fn resolve_url_direct_url_passthrough() {
517        let mut src = make_source(SourceType::Postgres);
518        src.url = Some("postgresql://carol@pg.example.com:5432/db".into());
519        let url = src.resolve_url().unwrap();
520        assert_eq!(url, "postgresql://carol@pg.example.com:5432/db");
521    }
522
523    #[test]
524    fn resolve_url_rejects_mixed_url_and_structured() {
525        let mut src = make_source(SourceType::Postgres);
526        src.url = Some("postgresql://carol@pg.example.com/db".into());
527        src.host = Some("other".into());
528        let err = src.resolve_url().unwrap_err();
529        let msg = format!("{err:#}");
530        assert!(
531            msg.contains("URL-based") || msg.contains("structured"),
532            "{msg}"
533        );
534    }
535
536    #[test]
537    fn resolve_url_rejects_missing_host() {
538        let mut src = make_source(SourceType::Postgres);
539        src.user = Some("alice".into());
540        src.database = Some("warehouse".into());
541        let err = src.resolve_url().unwrap_err();
542        let msg = format!("{err:#}");
543        assert!(msg.contains("host"), "{msg}");
544    }
545
546    // ── find_userinfo ────────────────────────────────────────────────────────
547
548    #[test]
549    fn find_userinfo_detects_password_in_url() {
550        let url = "postgresql://user:pass@host/db";
551        let result = find_userinfo(url);
552        assert!(result.is_some(), "should detect user:pass@");
553    }
554
555    #[test]
556    fn find_userinfo_no_password_no_at_returns_none() {
557        assert!(find_userinfo("postgresql://host/db").is_none());
558    }
559
560    #[test]
561    fn find_userinfo_user_only_at_sign_matches() {
562        let url = "postgresql://user@host/db";
563        assert!(find_userinfo(url).is_some(), "bare user@ should match");
564    }
565
566    #[test]
567    fn find_userinfo_no_at_sign_returns_none() {
568        assert!(find_userinfo("postgresql://db.example.com:5432/app").is_none());
569    }
570
571    // ── SEC-RED: embedded `@` in password must not leak to plan artifact ──────
572
573    #[test]
574    fn sec_artifact_redaction_password_with_at() {
575        // SEC-RED V7: find_userinfo (used by redact_for_artifact when building
576        // the persisted plan JSON) splits userinfo at the FIRST `@` via
577        // `rest.find('@')`, leaking the password tail after an embedded `@`.
578        // For `postgresql://rivet:p@ssw0rd@host/db` the first `@` sits right
579        // after `p`, so `userinfo_end` lands before `ssw0rd@host/db` and the
580        // rewrite emits `postgresql://REDACTED@ssw0rd@host/db` — the password
581        // tail `ssw0rd` round-trips into the artifact. The terminator must be
582        // the LAST `@` before the path (rfind semantics, as already used by
583        // redact_pg_url in state/mod.rs:564).
584        let mut src = make_source(SourceType::Postgres);
585        src.url = Some("postgresql://rivet:p@ssw0rd@db.example.com:5432/orders".into());
586        let (redacted, flag) = src.redact_for_artifact();
587        assert!(flag, "URL with userinfo must be flagged as redacted");
588        let url = redacted.url.expect("url retained after redaction");
589        assert!(
590            !url.contains("ssw0rd"),
591            "password tail after embedded @ must not leak into artifact: {url}"
592        );
593        assert!(
594            !url.contains("p@ssw0rd"),
595            "full password must not leak into artifact: {url}"
596        );
597        assert!(url.contains("REDACTED"), "placeholder must appear: {url}");
598        assert!(
599            url.contains("@db.example.com:5432/orders"),
600            "host and path must be retained: {url}"
601        );
602    }
603}