Skip to main content

schema_core/config/
secret.rs

1use std::fmt;
2
3use serde::{Deserialize, Serialize};
4
5use crate::common::{ConnectionUrl, ConnectionUrlError, HttpUrl, HttpUrlError, SinkName};
6
7/// The reserved environment variable that supplies / overrides the source
8/// connection URL. The source is a singleton, so one well-known name (the
9/// 12-factor convention) is unambiguous.
10pub const SOURCE_URL_VAR: &str = "DATABASE_URL";
11
12/// A value resolved at **runtime**: either a literal baked into the config or a
13/// reference to an environment variable read when the pipeline runs. Deferring
14/// resolution is what lets a compiled config travel without its secrets — a
15/// literal is carried as-is, an [`Env`](Self::Env) reference carries only the
16/// variable name, and the real value is read in the environment that runs it.
17#[derive(Clone, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19pub enum Secret {
20    /// A literal value, stored verbatim.
21    Value(String),
22    /// Read from this environment variable at resolution time.
23    Env(String),
24}
25
26impl Secret {
27    /// Read this secret's value from its own source — a literal as-is, an `Env`
28    /// from the environment. Does not consult any reserved variable.
29    fn read(&self) -> Result<String, ResolveError> {
30        match self {
31            Secret::Value(value) => Ok(value.clone()),
32            Secret::Env(var) => {
33                std::env::var(var).map_err(|_| ResolveError::EnvNotSet(var.clone()))
34            }
35        }
36    }
37}
38
39/// Redacted, so a debug-printed config never leaks a literal. An `Env` reference
40/// shows its variable name (not a secret); a literal shows `***`.
41impl fmt::Debug for Secret {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        match self {
44            Secret::Value(_) => write!(f, "Secret(***)"),
45            Secret::Env(var) => write!(f, "Secret(env {var})"),
46        }
47    }
48}
49
50impl fmt::Display for Secret {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        match self {
53            Secret::Value(_) => write!(f, "***"),
54            Secret::Env(var) => write!(f, "${{{var}}}"),
55        }
56    }
57}
58
59/// How the source connection is specified: a full URL (literal or from env) or
60/// the parts to assemble one. Resolution happens at runtime, so a configured
61/// value can be overridden by [`SOURCE_URL_VAR`] in the running environment.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum ConnectionSpec {
65    /// A full connection URL.
66    Url(Secret),
67    /// The parts of a connection URL; `password` may come from the environment.
68    Parts {
69        host: String,
70        port: u16,
71        user: String,
72        #[serde(default, skip_serializing_if = "Option::is_none")]
73        password: Option<Secret>,
74        database: String,
75    },
76}
77
78/// Resolve the source connection URL, with [`SOURCE_URL_VAR`] as the deployment
79/// override. Precedence, highest first:
80///
81/// 1. An explicit `Url(Env)` names its own source and wins — the reserved
82///    variable is not consulted.
83/// 2. [`SOURCE_URL_VAR`], if set — overriding a configured value or filling an
84///    omitted one.
85/// 3. The configured literal URL or assembled parts.
86/// 4. Otherwise an error.
87pub fn resolve_connection_url(
88    spec: Option<&ConnectionSpec>,
89) -> Result<ConnectionUrl, ResolveError> {
90    if let Some(ConnectionSpec::Url(env @ Secret::Env(_))) = spec {
91        return Ok(ConnectionUrl::try_new(env.read()?)?);
92    }
93
94    if let Ok(url) = std::env::var(SOURCE_URL_VAR) {
95        return Ok(ConnectionUrl::try_new(url)?);
96    }
97
98    match spec {
99        Some(ConnectionSpec::Url(secret)) => Ok(ConnectionUrl::try_new(secret.read()?)?),
100        Some(ConnectionSpec::Parts {
101            host,
102            port,
103            user,
104            password,
105            database,
106        }) => Ok(ConnectionUrl::from_parts()
107            .username(user.clone())
108            .host(host.clone())
109            .port(*port)
110            .database(database.clone())
111            .maybe_password(password.as_ref().map(Secret::read).transpose()?)
112            .call()?),
113        None => Err(ResolveError::MissingConnection),
114    }
115}
116
117/// Resolve a **required** sink value, with `reserved` as the deployment override
118/// variable. Same precedence as [`resolve_connection_url`]: an explicit `Env`
119/// reference wins; otherwise `reserved` overrides the literal; otherwise the
120/// literal.
121pub fn resolve_required(secret: &Secret, reserved: &str) -> Result<String, ResolveError> {
122    match secret {
123        env @ Secret::Env(_) => env.read(),
124        Secret::Value(literal) => Ok(literal_or_override(literal, reserved)),
125    }
126}
127
128/// Resolve an **optional** sink value. Same precedence as
129/// [`resolve_required`], plus: when the config omits it, `reserved` fills it if
130/// set, otherwise `None`.
131pub fn resolve_optional(
132    secret: Option<&Secret>,
133    reserved: &str,
134) -> Result<Option<String>, ResolveError> {
135    match secret {
136        Some(env @ Secret::Env(_)) => env.read().map(Some),
137        Some(Secret::Value(literal)) => Ok(Some(literal_or_override(literal, reserved))),
138        None => Ok(std::env::var(reserved).ok()),
139    }
140}
141
142/// The `reserved` variable if set, else the literal.
143fn literal_or_override(literal: &str, reserved: &str) -> String {
144    std::env::var(reserved).unwrap_or_else(|_| literal.to_owned())
145}
146
147/// The per-sink reserved-variable prefix: the sink's name, uppercased, so
148/// several OpenSearch sinks never collide (`<NAME>_OPENSEARCH_URL`, …).
149pub fn sink_var_prefix(name: &SinkName) -> String {
150    name.to_string().to_uppercase()
151}
152
153#[derive(thiserror::Error, Debug)]
154pub enum ResolveError {
155    #[error("environment variable '{0}' is not set")]
156    EnvNotSet(String),
157    #[error("source has no connection_url and {SOURCE_URL_VAR} is not set")]
158    MissingConnection,
159    #[error("invalid connection URL: {0}")]
160    ConnectionUrl(#[from] ConnectionUrlError),
161    #[error("invalid HTTP URL: {0}")]
162    HttpUrl(#[from] HttpUrlError),
163}
164
165/// Build an `HttpUrl` from a resolved string, mapping the validation error into
166/// a [`ResolveError`]. Used by sink resolution.
167pub fn http_url(value: String) -> Result<HttpUrl, ResolveError> {
168    Ok(HttpUrl::try_new(value)?)
169}