schema_core/config/
secret.rs1use std::fmt;
2
3use serde::{Deserialize, Serialize};
4
5use crate::common::{ConnectionUrl, ConnectionUrlError, HttpUrl, HttpUrlError, SinkName};
6
7pub const SOURCE_URL_VAR: &str = "DATABASE_URL";
11
12#[derive(Clone, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19pub enum Secret {
20 Value(String),
22 Env(String),
24}
25
26impl Secret {
27 fn read(&self) -> Result<String, ResolveError> {
30 match self {
31 Secret::Value(value) => Ok(value.clone()),
32 Secret::Env(var) => {
33 std::env::var(var).map_err(|_| ResolveError::EnvNotSet(var.clone()))
34 }
35 }
36 }
37}
38
39impl fmt::Debug for Secret {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 match self {
44 Secret::Value(_) => write!(f, "Secret(***)"),
45 Secret::Env(var) => write!(f, "Secret(env {var})"),
46 }
47 }
48}
49
50impl fmt::Display for Secret {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 match self {
53 Secret::Value(_) => write!(f, "***"),
54 Secret::Env(var) => write!(f, "${{{var}}}"),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum ConnectionSpec {
65 Url(Secret),
67 Parts {
69 host: String,
70 port: u16,
71 user: String,
72 #[serde(default, skip_serializing_if = "Option::is_none")]
73 password: Option<Secret>,
74 database: String,
75 },
76}
77
78pub fn resolve_connection_url(
88 spec: Option<&ConnectionSpec>,
89) -> Result<ConnectionUrl, ResolveError> {
90 if let Some(ConnectionSpec::Url(env @ Secret::Env(_))) = spec {
91 return Ok(ConnectionUrl::try_new(env.read()?)?);
92 }
93
94 if let Ok(url) = std::env::var(SOURCE_URL_VAR) {
95 return Ok(ConnectionUrl::try_new(url)?);
96 }
97
98 match spec {
99 Some(ConnectionSpec::Url(secret)) => Ok(ConnectionUrl::try_new(secret.read()?)?),
100 Some(ConnectionSpec::Parts {
101 host,
102 port,
103 user,
104 password,
105 database,
106 }) => Ok(ConnectionUrl::from_parts()
107 .username(user.clone())
108 .host(host.clone())
109 .port(*port)
110 .database(database.clone())
111 .maybe_password(password.as_ref().map(Secret::read).transpose()?)
112 .call()?),
113 None => Err(ResolveError::MissingConnection),
114 }
115}
116
117pub fn resolve_required(secret: &Secret, reserved: &str) -> Result<String, ResolveError> {
122 match secret {
123 env @ Secret::Env(_) => env.read(),
124 Secret::Value(literal) => Ok(literal_or_override(literal, reserved)),
125 }
126}
127
128pub fn resolve_optional(
132 secret: Option<&Secret>,
133 reserved: &str,
134) -> Result<Option<String>, ResolveError> {
135 match secret {
136 Some(env @ Secret::Env(_)) => env.read().map(Some),
137 Some(Secret::Value(literal)) => Ok(Some(literal_or_override(literal, reserved))),
138 None => Ok(std::env::var(reserved).ok()),
139 }
140}
141
142fn literal_or_override(literal: &str, reserved: &str) -> String {
144 std::env::var(reserved).unwrap_or_else(|_| literal.to_owned())
145}
146
147pub fn sink_var_prefix(name: &SinkName) -> String {
150 name.to_string().to_uppercase()
151}
152
153#[derive(thiserror::Error, Debug)]
154pub enum ResolveError {
155 #[error("environment variable '{0}' is not set")]
156 EnvNotSet(String),
157 #[error("source has no connection_url and {SOURCE_URL_VAR} is not set")]
158 MissingConnection,
159 #[error("invalid connection URL: {0}")]
160 ConnectionUrl(#[from] ConnectionUrlError),
161 #[error("invalid HTTP URL: {0}")]
162 HttpUrl(#[from] HttpUrlError),
163}
164
165pub fn http_url(value: String) -> Result<HttpUrl, ResolveError> {
168 Ok(HttpUrl::try_new(value)?)
169}