use std::fmt;
use serde::{Deserialize, Serialize};
use crate::common::{ConnectionUrl, ConnectionUrlError, HttpUrl, HttpUrlError, SinkName};
pub const SOURCE_URL_VAR: &str = "DATABASE_URL";
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Secret {
Value(String),
Env(String),
}
impl Secret {
fn read(&self) -> Result<String, ResolveError> {
match self {
Secret::Value(value) => Ok(value.clone()),
Secret::Env(var) => {
std::env::var(var).map_err(|_| ResolveError::EnvNotSet(var.clone()))
}
}
}
}
impl fmt::Debug for Secret {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Secret::Value(_) => write!(f, "Secret(***)"),
Secret::Env(var) => write!(f, "Secret(env {var})"),
}
}
}
impl fmt::Display for Secret {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Secret::Value(_) => write!(f, "***"),
Secret::Env(var) => write!(f, "${{{var}}}"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ConnectionSpec {
Url(Secret),
Parts {
host: String,
port: u16,
user: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
password: Option<Secret>,
database: String,
},
}
pub fn resolve_connection_url(
spec: Option<&ConnectionSpec>,
) -> Result<ConnectionUrl, ResolveError> {
if let Some(ConnectionSpec::Url(env @ Secret::Env(_))) = spec {
return Ok(ConnectionUrl::try_new(env.read()?)?);
}
if let Ok(url) = std::env::var(SOURCE_URL_VAR) {
return Ok(ConnectionUrl::try_new(url)?);
}
match spec {
Some(ConnectionSpec::Url(secret)) => Ok(ConnectionUrl::try_new(secret.read()?)?),
Some(ConnectionSpec::Parts {
host,
port,
user,
password,
database,
}) => Ok(ConnectionUrl::from_parts()
.username(user.clone())
.host(host.clone())
.port(*port)
.database(database.clone())
.maybe_password(password.as_ref().map(Secret::read).transpose()?)
.call()?),
None => Err(ResolveError::MissingConnection),
}
}
pub fn resolve_required(secret: &Secret, reserved: &str) -> Result<String, ResolveError> {
match secret {
env @ Secret::Env(_) => env.read(),
Secret::Value(literal) => Ok(literal_or_override(literal, reserved)),
}
}
pub fn resolve_optional(
secret: Option<&Secret>,
reserved: &str,
) -> Result<Option<String>, ResolveError> {
match secret {
Some(env @ Secret::Env(_)) => env.read().map(Some),
Some(Secret::Value(literal)) => Ok(Some(literal_or_override(literal, reserved))),
None => Ok(std::env::var(reserved).ok()),
}
}
fn literal_or_override(literal: &str, reserved: &str) -> String {
std::env::var(reserved).unwrap_or_else(|_| literal.to_owned())
}
pub fn sink_var_prefix(name: &SinkName) -> String {
name.to_string().to_uppercase()
}
#[derive(thiserror::Error, Debug)]
pub enum ResolveError {
#[error("environment variable '{0}' is not set")]
EnvNotSet(String),
#[error("source has no connection_url and {SOURCE_URL_VAR} is not set")]
MissingConnection,
#[error("invalid connection URL: {0}")]
ConnectionUrl(#[from] ConnectionUrlError),
#[error("invalid HTTP URL: {0}")]
HttpUrl(#[from] HttpUrlError),
}
pub fn http_url(value: String) -> Result<HttpUrl, ResolveError> {
Ok(HttpUrl::try_new(value)?)
}