schema_core/config/
secret.rs1use std::fmt;
2
3use serde::{Deserialize, Serialize};
4
5use crate::common::{ConnectionUrl, ConnectionUrlError, HttpUrl, HttpUrlError, SinkName};
6
7pub const SOURCE_URL_VAR: &str = "DATABASE_URL";
11
12#[derive(Clone, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19pub enum Secret {
20 Value(String),
22 Env(String),
24}
25
26impl Secret {
27 fn read(&self) -> Result<String, ResolveError> {
30 match self {
31 Secret::Value(value) => Ok(value.clone()),
32 Secret::Env(var) => {
33 std::env::var(var).map_err(|_| ResolveError::EnvNotSet(var.clone()))
34 }
35 }
36 }
37}
38
39impl fmt::Debug for Secret {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 match self {
44 Secret::Value(_) => write!(f, "Secret(***)"),
45 Secret::Env(var) => write!(f, "Secret(env {var})"),
46 }
47 }
48}
49
50impl fmt::Display for Secret {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 match self {
53 Secret::Value(_) => write!(f, "***"),
54 Secret::Env(var) => write!(f, "${{{var}}}"),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum ConnectionSpec {
65 Url(Secret),
67 Parts {
69 host: String,
70 port: u16,
71 user: String,
72 #[serde(default, skip_serializing_if = "Option::is_none")]
73 password: Option<Secret>,
74 database: String,
75 },
76}
77
78pub fn resolve_connection_url(
88 spec: Option<&ConnectionSpec>,
89) -> Result<ConnectionUrl, ResolveError> {
90 if let Some(ConnectionSpec::Url(env @ Secret::Env(_))) = spec {
92 return Ok(ConnectionUrl::try_new(env.read()?)?);
93 }
94
95 if let Ok(url) = std::env::var(SOURCE_URL_VAR) {
98 return Ok(ConnectionUrl::try_new(url)?);
99 }
100
101 match spec {
102 Some(ConnectionSpec::Url(secret)) => Ok(ConnectionUrl::try_new(secret.read()?)?),
103 Some(ConnectionSpec::Parts {
104 host,
105 port,
106 user,
107 password,
108 database,
109 }) => Ok(ConnectionUrl::from_parts()
110 .username(user.clone())
111 .host(host.clone())
112 .port(*port)
113 .database(database.clone())
114 .maybe_password(password.as_ref().map(Secret::read).transpose()?)
115 .call()?),
116 None => Err(ResolveError::MissingConnection),
117 }
118}
119
120pub fn resolve_required(secret: &Secret, reserved: &str) -> Result<String, ResolveError> {
125 match secret {
126 env @ Secret::Env(_) => env.read(),
127 Secret::Value(literal) => Ok(literal_or_override(literal, reserved)),
128 }
129}
130
131pub fn resolve_optional(
135 secret: Option<&Secret>,
136 reserved: &str,
137) -> Result<Option<String>, ResolveError> {
138 match secret {
139 Some(env @ Secret::Env(_)) => env.read().map(Some),
140 Some(Secret::Value(literal)) => Ok(Some(literal_or_override(literal, reserved))),
141 None => Ok(std::env::var(reserved).ok()),
142 }
143}
144
145fn literal_or_override(literal: &str, reserved: &str) -> String {
147 std::env::var(reserved).unwrap_or_else(|_| literal.to_owned())
148}
149
150pub fn sink_var_prefix(name: &SinkName) -> String {
153 name.to_string().to_uppercase()
154}
155
156#[derive(thiserror::Error, Debug)]
157pub enum ResolveError {
158 #[error("environment variable '{0}' is not set")]
159 EnvNotSet(String),
160 #[error("source has no connection_url and {SOURCE_URL_VAR} is not set")]
161 MissingConnection,
162 #[error("invalid connection URL: {0}")]
163 ConnectionUrl(#[from] ConnectionUrlError),
164 #[error("invalid HTTP URL: {0}")]
165 HttpUrl(#[from] HttpUrlError),
166}
167
168pub fn http_url(value: String) -> Result<HttpUrl, ResolveError> {
171 Ok(HttpUrl::try_new(value)?)
172}