r-harvest 0.2.0

Transactional Outbox implementation for Kafka and Postgres
use color_eyre::{eyre::WrapErr, Result};
use config::{Config, File};
use lazy_static::lazy_static;
use serde::Deserialize;
use sqlx::postgres::PgConnectOptions;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::time::Duration;

static DEFAULT_CONFIG_FILE: &str = "harvester.yml";

lazy_static! {
    pub static ref APP: Settings = Settings::new().unwrap();
}
mod postgres_uri {
    use std::str::FromStr;

    use serde::{self, Deserialize, Deserializer};
    use sqlx::postgres::PgConnectOptions;

    pub fn deserialize<'de, D>(deserializer: D) -> Result<PgConnectOptions, D::Error>
    where
        D: Deserializer<'de>,
    {
        let s = String::deserialize(deserializer)?;
        PgConnectOptions::from_str(s.as_str()).map_err(serde::de::Error::custom)
    }
}

#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Settings {
    pub name: Option<String>,
    #[serde(deserialize_with = "postgres_uri::deserialize")]
    pub database_uri: PgConnectOptions,
    pub outbox_table: Option<String>,
    pub base_kafka_config: BaseKafkaConfig,
    pub producer_kafka_config: Option<HashMap<String, String>>,
    pub leader_topic: String,
    pub leader_group_id: String,
    pub health_probe: Option<SocketAddr>,
    pub leader_probe: Option<SocketAddr>,
    #[serde(default)]
    pub limits: HarvesterLimits,
}

#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct BaseKafkaConfig {
    #[serde(rename = "bootstrap.servers")]
    bootstrap_servers: String,
    #[serde(flatten)]
    extra_config: HashMap<String, String>,
}

impl From<BaseKafkaConfig> for HashMap<String, String> {
    fn from(val: BaseKafkaConfig) -> Self {
        let mut config = val.extra_config.clone();
        config.insert(String::from("bootstrap.servers"), val.bootstrap_servers);
        config
    }
}

#[derive(Clone, Debug, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct HarvesterLimits {
    #[serde(with = "humantime_serde", default)]
    pub poll_duration: Option<Duration>,
    #[serde(with = "humantime_serde", default)]
    pub min_poll_duration: Option<Duration>,
    #[serde(with = "humantime_serde", default)]
    pub heartbeat_timeout: Option<Duration>,
    #[serde(with = "humantime_serde", default)]
    pub harvest_backoff: Option<Duration>,
    #[serde(with = "humantime_serde", default)]
    pub db_connect_timeout: Option<Duration>,
    #[serde(default)]
    pub harvest_limit: Option<u32>,
}

impl Settings {
    pub fn new() -> Result<Settings> {
        let mut settings = Config::new();

        let config_path =
            env::var("HARVEST_CONFIG").unwrap_or_else(|_| DEFAULT_CONFIG_FILE.to_string());
        settings
            .merge(File::with_name(&config_path))
            .wrap_err("Please make sure your configuration is present")?;

        settings
            .try_into()
            .wrap_err_with(|| format!("Failed to read configuration file: {}", config_path))
    }
}