Skip to main content

rivet/config/
source.rs

1//! Source-database connection config: URL/structured fields, TLS, environment hints.
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6use super::resolve::resolve_env_vars;
7use crate::tuning::{TuningConfig, TuningProfile};
8
9#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
10#[serde(deny_unknown_fields)]
11pub struct SourceConfig {
12    #[serde(rename = "type")]
13    pub source_type: SourceType,
14
15    pub url: Option<String>,
16    pub url_env: Option<String>,
17    pub url_file: Option<String>,
18
19    pub host: Option<String>,
20    pub port: Option<u16>,
21    pub user: Option<String>,
22    pub password: Option<String>,
23    pub password_env: Option<String>,
24    pub database: Option<String>,
25
26    /// Operational profile of the source database.
27    ///
28    /// Selects the **default** tuning profile when none is explicitly set in
29    /// `source.tuning.profile` or `export.tuning.profile`:
30    ///
31    /// | `environment`           | default profile |
32    /// |-------------------------|------------------|
33    /// | `production` (default)  | `balanced` (50 ms throttle, 10 k batch, retries) |
34    /// | `replica`               | `balanced` |
35    /// | `local`                 | `fast` (no throttle, 50 k batch — saves ~30% wall on localhost) |
36    ///
37    /// Explicit `tuning.profile:` always wins over this hint.
38    #[serde(default)]
39    pub environment: Option<SourceEnvironment>,
40
41    #[serde(default)]
42    pub tuning: Option<TuningConfig>,
43
44    /// Transport security settings (ADR: SecOps). When absent, Rivet connects
45    /// without TLS — a warning is emitted so operators are aware. See [`TlsConfig`].
46    #[serde(default)]
47    pub tls: Option<TlsConfig>,
48}
49
50/// Operational environment of the source database — drives the default tuning
51/// profile when none is explicitly set. Opt-in: existing configs without
52/// `environment:` continue to use `balanced` as today.
53#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
54#[serde(rename_all = "lowercase")]
55pub enum SourceEnvironment {
56    /// Localhost / Docker compose / read-only container — no throttle by default
57    /// (compiles to `fast` profile defaults). Use when DB load is not a concern.
58    Local,
59    /// Read replica — `balanced` default. Same throttle as production, but free
60    /// to dial up `tuning.batch_size`.
61    Replica,
62    /// Live production primary — `balanced` default. Bias toward source-safety.
63    Production,
64}
65
66impl SourceEnvironment {
67    /// Default tuning profile selected by this environment when the user has
68    /// not set `tuning.profile:` explicitly.
69    pub fn default_profile(self) -> TuningProfile {
70        match self {
71            SourceEnvironment::Local => TuningProfile::Fast,
72            SourceEnvironment::Replica | SourceEnvironment::Production => TuningProfile::Balanced,
73        }
74    }
75}
76
77/// Transport security for the source database connection.
78///
79/// Credentials and exported data cross the wire on every connection; without TLS
80/// they are visible to anyone on the network path (cloud inter-VPC, cross-AZ, or
81/// a compromised upstream). The default for all new connections is
82/// [`TlsMode::Require`] when `tls:` is present; setting `tls: { mode: disable }`
83/// is explicit opt-out.
84///
85/// ```yaml
86/// source:
87///   type: postgres
88///   url_env: DATABASE_URL
89///   tls:
90///     mode: verify-full
91///     ca_file: /etc/ssl/certs/rds-ca-2019-root.pem
92/// ```
93#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
94#[serde(deny_unknown_fields)]
95pub struct TlsConfig {
96    /// Enforcement level. See [`TlsMode`].
97    #[serde(default)]
98    pub mode: TlsMode,
99    /// PEM-encoded CA certificate to trust for server verification. Required
100    /// for [`TlsMode::VerifyCa`] and [`TlsMode::VerifyFull`] against a private CA.
101    pub ca_file: Option<String>,
102    /// Accept certificates not chained to a trusted CA. Dangerous — disables
103    /// server authentication — and only honored when explicitly `true`.
104    #[serde(default)]
105    pub accept_invalid_certs: bool,
106    /// Accept certificates whose subjectAltName does not match the connection
107    /// hostname. Dangerous — disables hostname verification.
108    #[serde(default)]
109    pub accept_invalid_hostnames: bool,
110}
111
112/// TLS enforcement mode, mirroring libpq's `sslmode` semantics where possible.
113#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
114#[serde(rename_all = "kebab-case")]
115pub enum TlsMode {
116    /// Plaintext. Use only inside trusted networks (loopback, cgroup-private).
117    Disable,
118    /// Require a TLS handshake; accept the server certificate without verifying
119    /// issuer or hostname. Protects against passive sniffing, not MITM.
120    Require,
121    /// TLS + verify certificate chains to the configured / system trust store.
122    /// Does not check hostname (useful for IP-addressed or internal names).
123    VerifyCa,
124    /// TLS + verify chain **and** hostname against the server cert's SAN/CN.
125    /// Recommended default for production.
126    #[default]
127    VerifyFull,
128}
129
130impl TlsMode {
131    pub fn is_enforced(self) -> bool {
132        !matches!(self, TlsMode::Disable)
133    }
134}
135
136impl SourceConfig {
137    /// Return a copy of this config with **all plaintext credential material stripped**,
138    /// safe to embed in a persisted [`crate::plan::PlanArtifact`] (ADR-0005 PA9).
139    ///
140    /// Redaction rules:
141    /// - `password` → always `None` (plaintext password never leaves the process).
142    /// - `url` containing `user[:password]@` → userinfo segment replaced with `"REDACTED"`.
143    /// - `url_env`, `url_file`, `password_env` — kept (env var **names** and file paths
144    ///   are references, not secrets; `apply` needs them to re-resolve credentials).
145    /// - `host`, `port`, `user`, `database` — kept (structured connection metadata).
146    ///
147    /// If a plaintext `password` or `url` is redacted, callers should surface a warning
148    /// to the operator so env/file-based auth is available at apply time.
149    pub fn redact_for_artifact(&self) -> (Self, bool) {
150        let mut out = self.clone();
151        let mut redacted = false;
152
153        if out.password.is_some() {
154            out.password = None;
155            redacted = true;
156        }
157
158        if let Some(ref raw) = out.url
159            && let Some((userinfo_end, scheme_end)) = find_userinfo(raw)
160        {
161            let mut s = String::with_capacity(raw.len());
162            s.push_str(&raw[..scheme_end]); // "postgresql://"
163            s.push_str("REDACTED");
164            s.push_str(&raw[userinfo_end..]); // "@host:port/db…"
165            out.url = Some(s);
166            redacted = true;
167        }
168
169        (out, redacted)
170    }
171
172    pub(crate) fn has_structured_fields(&self) -> bool {
173        self.host.is_some()
174            || self.user.is_some()
175            || self.database.is_some()
176            || self.password.is_some()
177            || self.password_env.is_some()
178    }
179
180    pub(crate) fn has_url_fields(&self) -> bool {
181        self.url.is_some() || self.url_env.is_some() || self.url_file.is_some()
182    }
183
184    fn build_url_from_fields(&self) -> crate::error::Result<String> {
185        // First-user-friendly errors: name the missing field, suggest a
186        // concrete value, and remind the operator that `url_env` is the
187        // alternative path so they don't bounce.  See
188        // `docs/getting-started.md` for the full onboarding flow.
189        let host = self.host.as_deref().ok_or_else(|| {
190            anyhow::anyhow!(
191                "source: structured config is missing 'host'.\n  Hint: add `host: localhost` (or your DB host) under `source:` in rivet.yaml.\n  Or switch to URL-based config: `url_env: DATABASE_URL`."
192            )
193        })?;
194        let user = self.user.as_deref().ok_or_else(|| {
195            anyhow::anyhow!(
196                "source: structured config is missing 'user'.\n  Hint: add `user: <username>` under `source:` in rivet.yaml."
197            )
198        })?;
199        let database = self.database.as_deref().ok_or_else(|| {
200            anyhow::anyhow!(
201                "source: structured config is missing 'database'.\n  Hint: add `database: <dbname>` under `source:` in rivet.yaml."
202            )
203        })?;
204
205        // SecOps: keep the plaintext password inside a `Zeroizing<String>` until it
206        // is spliced into the final URL, so the standalone password buffer is
207        // wiped on drop (the final URL still lives as a plain String but is
208        // shorter-lived and dropped by the driver constructor).
209        let password: zeroize::Zeroizing<String> =
210            zeroize::Zeroizing::new(match (&self.password, &self.password_env) {
211                (Some(_), Some(_)) => {
212                    anyhow::bail!("source: specify 'password' or 'password_env', not both");
213                }
214                (Some(p), None) => {
215                    static WARNED: std::sync::Once = std::sync::Once::new();
216                    WARNED.call_once(|| {
217                        log::warn!(
218                            "source config contains plaintext password -- consider using password_env"
219                        );
220                    });
221                    resolve_env_vars(p)?
222                }
223                (None, Some(env)) => std::env::var(env).map_err(|_| {
224                    anyhow::anyhow!(
225                        "source: env var '{0}' is not set (referenced by password_env).\n  Hint: export the value before running, e.g.\n      export {0}='your-database-password'",
226                        env
227                    )
228                })?,
229                (None, None) => String::new(),
230            });
231
232        let default_port = match self.source_type {
233            SourceType::Postgres => 5432,
234            SourceType::Mysql => 3306,
235        };
236        let port = self.port.unwrap_or(default_port);
237
238        let scheme = match self.source_type {
239            SourceType::Postgres => "postgresql",
240            SourceType::Mysql => "mysql",
241        };
242
243        if password.is_empty() {
244            Ok(format!(
245                "{}://{}@{}:{}/{}",
246                scheme, user, host, port, database
247            ))
248        } else {
249            Ok(format!(
250                "{}://{}:{}@{}:{}/{}",
251                scheme,
252                user,
253                password.as_str(),
254                host,
255                port,
256                database
257            ))
258        }
259    }
260
261    pub fn resolve_url(&self) -> crate::error::Result<String> {
262        if self.has_url_fields() && self.has_structured_fields() {
263            anyhow::bail!(
264                "source: pick either URL-based config (url/url_env/url_file) OR structured fields (host/user/database/port/password_env), not both.\n  Hint: remove whichever block you don't want; mixing the two is ambiguous."
265            );
266        }
267
268        if self.has_structured_fields() {
269            return self.build_url_from_fields();
270        }
271
272        let raw = match (&self.url, &self.url_env, &self.url_file) {
273            (Some(u), None, None) => u.clone(),
274            (None, Some(env), None) => std::env::var(env).map_err(|_| {
275                anyhow::anyhow!(
276                    "source: env var '{0}' is not set (referenced by url_env).\n  Hint: export the value before running, e.g.\n      export {0}='postgresql://user:pass@host:5432/dbname'\n  Or change `url_env: {0}` in your config to a different env var name.",
277                    env
278                )
279            })?,
280            (None, None, Some(file)) => std::fs::read_to_string(file)
281                .map_err(|e| {
282                    anyhow::anyhow!(
283                        "source: cannot read url_file '{}': {}.\n  Hint: ensure the file exists and is readable; the file should contain only the URL on a single line.",
284                        file,
285                        e
286                    )
287                })?
288                .trim()
289                .to_string(),
290            _ => anyhow::bail!(
291                "source: configure exactly one connection method:\n  url_env: DATABASE_URL                          (URL from env var — recommended)\n  url: 'postgresql://user:pass@host:5432/db'      (inline — not recommended for committed configs)\n  url_file: /etc/rivet/source.url                 (URL from file — rotation-friendly)\n  host/user/database/...                          (structured fields under `source:`)"
292            ),
293        };
294
295        let resolved = resolve_env_vars(&raw)?;
296
297        if resolved.contains('@')
298            && resolved.contains(':')
299            && let Some(userinfo) = resolved.split('@').next()
300            && userinfo.contains(':')
301            && !userinfo.ends_with(':')
302        {
303            // `resolve_url` is called from many places per run (plan build,
304            // doctor, every export, every chunk worker).  Fire this warning
305            // exactly once per process so operators see one clean nudge,
306            // not 3-4 stacked copies in stderr.
307            static WARNED: std::sync::Once = std::sync::Once::new();
308            WARNED.call_once(|| {
309                log::warn!(
310                    "source URL contains plaintext password -- consider using url_env or url_file"
311                );
312            });
313        }
314
315        Ok(resolved)
316    }
317}
318
319#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
320#[serde(rename_all = "lowercase")]
321pub enum SourceType {
322    Postgres,
323    Mysql,
324}
325
326/// Locate `user[:password]@` userinfo inside a standard URL.
327///
328/// Returns `(userinfo_end_index, scheme_end_index)` where:
329/// - `scheme_end_index` points right after `"://"` (start of userinfo)
330/// - `userinfo_end_index` points at the `@` separator (exclusive of `@`)
331///
332/// Returns `None` if the URL has no userinfo segment.
333fn find_userinfo(raw: &str) -> Option<(usize, usize)> {
334    let scheme = raw.find("://")? + 3;
335    let rest = &raw[scheme..];
336    let at = rest.find('@')?;
337    // `@` must appear before the path/query start so we don't match `?foo=a@b` etc.
338    if let Some(path) = rest.find('/')
339        && path < at
340    {
341        return None;
342    }
343    Some((scheme + at, scheme))
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349
350    // ── TlsMode::is_enforced ────────────────────────────────────────────────
351
352    #[test]
353    fn tls_mode_disable_not_enforced() {
354        assert!(!TlsMode::Disable.is_enforced());
355    }
356
357    #[test]
358    fn tls_mode_require_is_enforced() {
359        assert!(TlsMode::Require.is_enforced());
360        assert!(TlsMode::VerifyCa.is_enforced());
361        assert!(TlsMode::VerifyFull.is_enforced());
362    }
363
364    // ── SourceConfig::redact_for_artifact ───────────────────────────────────
365
366    fn make_source(source_type: SourceType) -> SourceConfig {
367        SourceConfig {
368            source_type,
369            url: None,
370            url_env: None,
371            url_file: None,
372            host: None,
373            port: None,
374            user: None,
375            password: None,
376            password_env: None,
377            database: None,
378            environment: None,
379            tuning: None,
380            tls: None,
381        }
382    }
383
384    #[test]
385    fn redact_plaintext_password() {
386        let mut src = make_source(SourceType::Postgres);
387        src.password = Some("s3cr3t".into());
388        let (redacted, flag) = src.redact_for_artifact();
389        assert!(flag, "redaction should be flagged");
390        assert!(
391            redacted.password.is_none(),
392            "plaintext password must be stripped"
393        );
394    }
395
396    #[test]
397    fn redact_url_with_password() {
398        let mut src = make_source(SourceType::Postgres);
399        src.url = Some("postgresql://user:hunter2@db.example.com:5432/app".into());
400        let (redacted, flag) = src.redact_for_artifact();
401        assert!(flag, "URL redaction flagged");
402        let url = redacted.url.unwrap();
403        assert!(!url.contains("hunter2"), "password must not appear: {url}");
404        assert!(url.contains("REDACTED"), "placeholder must appear: {url}");
405        assert!(url.contains("@db.example.com"), "host retained: {url}");
406    }
407
408    #[test]
409    fn redact_url_without_at_sign_not_flagged() {
410        let mut src = make_source(SourceType::Postgres);
411        src.url = Some("postgresql://db.example.com:5432/app".into());
412        let (_, flag) = src.redact_for_artifact();
413        assert!(!flag, "URL with no userinfo must not be flagged");
414    }
415
416    #[test]
417    fn redact_url_with_user_but_no_password_is_flagged() {
418        let mut src = make_source(SourceType::Postgres);
419        src.url = Some("postgresql://user@db.example.com:5432/app".into());
420        let (redacted, flag) = src.redact_for_artifact();
421        assert!(flag, "bare user@ is still userinfo and gets redacted");
422        let url = redacted.url.unwrap();
423        assert!(url.contains("REDACTED"), "userinfo replaced: {url}");
424        assert!(!url.contains("user@"), "bare username removed: {url}");
425    }
426
427    #[test]
428    fn redact_env_var_reference_kept_intact() {
429        let mut src = make_source(SourceType::Mysql);
430        src.url_env = Some("DB_URL".into());
431        src.password_env = Some("DB_PASS".into());
432        let (redacted, flag) = src.redact_for_artifact();
433        assert!(!flag, "env var references are not secrets");
434        assert_eq!(redacted.url_env.as_deref(), Some("DB_URL"));
435        assert_eq!(redacted.password_env.as_deref(), Some("DB_PASS"));
436    }
437
438    #[test]
439    fn redact_mysql_url_with_password() {
440        let mut src = make_source(SourceType::Mysql);
441        src.url = Some("mysql://root:pass@127.0.0.1:3306/mydb".into());
442        let (redacted, flag) = src.redact_for_artifact();
443        assert!(flag);
444        let url = redacted.url.unwrap();
445        assert!(url.contains("REDACTED"), "{url}");
446        assert!(!url.contains("pass"), "{url}");
447    }
448
449    // ── SourceConfig::resolve_url (structured fields) ───────────────────────
450
451    #[test]
452    fn resolve_url_from_structured_fields_postgres() {
453        let mut src = make_source(SourceType::Postgres);
454        src.host = Some("pg.internal".into());
455        src.user = Some("alice".into());
456        src.database = Some("warehouse".into());
457        src.port = Some(5433);
458        let url = src.resolve_url().unwrap();
459        assert_eq!(url, "postgresql://alice@pg.internal:5433/warehouse");
460    }
461
462    #[test]
463    fn resolve_url_from_structured_fields_defaults_port() {
464        let mut src = make_source(SourceType::Mysql);
465        src.host = Some("my.internal".into());
466        src.user = Some("bob".into());
467        src.database = Some("orders".into());
468        let url = src.resolve_url().unwrap();
469        assert_eq!(url, "mysql://bob@my.internal:3306/orders");
470    }
471
472    #[test]
473    fn resolve_url_direct_url_passthrough() {
474        let mut src = make_source(SourceType::Postgres);
475        src.url = Some("postgresql://carol@pg.example.com:5432/db".into());
476        let url = src.resolve_url().unwrap();
477        assert_eq!(url, "postgresql://carol@pg.example.com:5432/db");
478    }
479
480    #[test]
481    fn resolve_url_rejects_mixed_url_and_structured() {
482        let mut src = make_source(SourceType::Postgres);
483        src.url = Some("postgresql://carol@pg.example.com/db".into());
484        src.host = Some("other".into());
485        let err = src.resolve_url().unwrap_err();
486        let msg = format!("{err:#}");
487        assert!(
488            msg.contains("URL-based") || msg.contains("structured"),
489            "{msg}"
490        );
491    }
492
493    #[test]
494    fn resolve_url_rejects_missing_host() {
495        let mut src = make_source(SourceType::Postgres);
496        src.user = Some("alice".into());
497        src.database = Some("warehouse".into());
498        let err = src.resolve_url().unwrap_err();
499        let msg = format!("{err:#}");
500        assert!(msg.contains("host"), "{msg}");
501    }
502
503    // ── find_userinfo ────────────────────────────────────────────────────────
504
505    #[test]
506    fn find_userinfo_detects_password_in_url() {
507        let url = "postgresql://user:pass@host/db";
508        let result = find_userinfo(url);
509        assert!(result.is_some(), "should detect user:pass@");
510    }
511
512    #[test]
513    fn find_userinfo_no_password_no_at_returns_none() {
514        assert!(find_userinfo("postgresql://host/db").is_none());
515    }
516
517    #[test]
518    fn find_userinfo_user_only_at_sign_matches() {
519        let url = "postgresql://user@host/db";
520        assert!(find_userinfo(url).is_some(), "bare user@ should match");
521    }
522
523    #[test]
524    fn find_userinfo_no_at_sign_returns_none() {
525        assert!(find_userinfo("postgresql://db.example.com:5432/app").is_none());
526    }
527}