radicle-ci-broker 0.24.0

add integration to CI engins or systems to a Radicle node
Documentation
//! Configuration for the CI broker and related programs.

use std::{
    collections::HashMap,
    ffi::OsStr,
    path::{Path, PathBuf},
    time::Duration,
};

use duration_str::deserialize_duration;
use serde::{Deserialize, Serialize};

use crate::{
    adapter::{Adapter, AdapterError, Adapters},
    filter::{EventFilter, Trigger},
    logger,
    sensitive::Sensitive,
};

const DEFAULT_MAX_RUN_TIME: Duration = Duration::from_secs(3600);
const DEFAULT_QUEUE_LEN_INTERVAL: Duration = Duration::from_secs(3600);

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
    default_adapter: Option<String>,
    triggers: Option<Vec<TriggerConfig>>,
    report_dir: Option<PathBuf>,
    db: PathBuf,

    // This isn't actually used. The status page is updated about once
    // a second, see `UPDATE_IMTERVAL` in `src/notif.rs`. However, we
    // don't want to break existing configuration files, so we allow
    // it. We just don't use it anywhere.
    #[allow(dead_code)]
    #[serde(skip)]
    status_update_interval_seconds: Option<u64>,

    #[serde(default)]
    adapters: HashMap<String, AdapterSpec>,

    #[serde(default)]
    filters: Vec<EventFilter>,

    #[serde(deserialize_with = "deserialize_duration")]
    #[serde(default = "default_max_run_time")]
    max_run_time: Duration,

    #[serde(deserialize_with = "deserialize_duration")]
    #[serde(default = "default_queue_len_interval")]
    queue_len_interval: Duration,

    #[serde(default)]
    concurrent_adapters: Option<usize>,

    description: Option<String>,
}

fn default_max_run_time() -> Duration {
    DEFAULT_MAX_RUN_TIME
}

fn default_queue_len_interval() -> Duration {
    DEFAULT_QUEUE_LEN_INTERVAL
}

impl Config {
    pub fn load(filename: &Path) -> Result<Self, ConfigError> {
        let config =
            std::fs::read(filename).map_err(|e| ConfigError::ReadConfig(filename.into(), e))?;
        let config: Config = match filename.extension().and_then(OsStr::to_str) {
            Some("json") => serde_json::from_slice(&config)
                .map_err(|e| ConfigError::ParseConfigJson(filename.into(), e))?,
            _ => serde_norway::from_slice(&config)
                .map_err(|e| ConfigError::ParseConfig(filename.into(), e))?,
        };
        config.check(filename)?;
        Ok(config)
    }

    fn check(&self, filename: &Path) -> Result<(), ConfigError> {
        if self.status_update_interval_seconds.is_some() {
            logger::config_deprecated("status_update_interval_seconds", filename);
        }

        if !self.filters().is_empty() && self.default_adapter.is_none() {
            return Err(ConfigError::NoDefaultAdapter);
        }

        if let Some(triggers) = &self.triggers {
            for trigger in triggers.iter() {
                if !self.adapters.contains_key(&trigger.adapter) {
                    return Err(ConfigError::UnknownAdapter(trigger.adapter.clone()));
                }
            }
        }
        Ok(())
    }

    pub fn description(&self) -> Option<&str> {
        self.description.as_deref()
    }

    pub fn report_dir(&self) -> Option<&Path> {
        self.report_dir.as_deref()
    }

    pub fn concurrent_adapters(&self) -> usize {
        match self.concurrent_adapters {
            None | Some(0) => 1,
            Some(n) => n,
        }
    }

    pub fn to_adapters(&self) -> Result<Adapters, AdapterError> {
        Adapters::new(&self.adapters, self.default_adapter.as_deref())
    }

    pub fn filters(&self) -> &[EventFilter] {
        &self.filters
    }

    pub fn to_triggers(&self) -> Vec<Trigger> {
        self.triggers
            .as_deref()
            .unwrap_or(&[])
            .iter()
            .map(Trigger::from)
            .collect()
    }

    pub fn max_run_time(&self) -> Duration {
        self.max_run_time
    }

    pub fn queue_len_interval(&self) -> Duration {
        self.queue_len_interval
    }

    pub fn db(&self) -> &Path {
        &self.db
    }

    pub fn to_json(&self) -> Result<String, ConfigError> {
        serde_json::to_string_pretty(self).map_err(ConfigError::ToJson)
    }
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct AdapterSpec {
    pub command: PathBuf,

    #[serde(default)]
    env: HashMap<String, String>,

    #[serde(default)]
    sensitive_env: HashMap<String, Sensitive>,

    /// Configuration for the adapter. If the `Self::config_env` field
    /// is set, the configuration is serialize into YAML and written
    /// to a temporary file. The environment variable named in
    /// `config_env` is set to the path of the temporary file.
    #[serde(default)]
    config: HashMap<String, serde_norway::Value>,

    config_env: Option<String>,
}

impl AdapterSpec {
    pub fn config_env(&self) -> Option<&str> {
        self.config_env.as_deref()
    }

    pub fn config(&self) -> &HashMap<String, serde_norway::Value> {
        &self.config
    }

    pub fn envs(&self) -> &HashMap<String, String> {
        &self.env
    }

    pub fn sensitive_envs(&self) -> &HashMap<String, Sensitive> {
        &self.sensitive_env
    }
}

impl From<&AdapterSpec> for Adapter {
    fn from(config: &AdapterSpec) -> Self {
        let adapter = Self::new(&config.command)
            .with_environment(config.envs())
            .with_sensitive_environment(config.sensitive_envs())
            .with_config(config.config.clone());
        if let Some(x) = &config.config_env {
            adapter.with_config_env(x)
        } else {
            adapter
        }
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TriggerConfig {
    pub adapter: String,
    pub filters: Vec<EventFilter>,
}

/// All possible errors from configuration handling.
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
    /// Can't read config file.
    #[error("could not read config file {0}")]
    ReadConfig(PathBuf, #[source] std::io::Error),

    /// Can't parse config file as YAML.
    #[error("failed to parse configuration file as YAML: {0}")]
    ParseConfig(PathBuf, #[source] serde_norway::Error),

    /// Can't parse config file as JSON.
    #[error("failed to parse configuration file as JSON: {0}")]
    ParseConfigJson(PathBuf, #[source] serde_json::Error),

    /// Can't convert configuration into JSON.
    #[error("failed to convert configuration into JSON")]
    ToJson(#[source] serde_json::Error),

    /// No default adapter.
    #[error(
        "the default adapter is not defined in the configuration, but is required for the filters field"
    )]
    NoDefaultAdapter,

    /// Unknown adapter.
    #[error("'triggers' refers to adapter that hasn't been defined: {0}")]
    UnknownAdapter(String),
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    #[allow(clippy::unwrap_used)]
    fn parse_config_yaml() {
        const YAML: &str = r#"---
default_adapter: foo
adapters: {}
filters: []
db: "foo.db"
max_run_time: 1min
...
"#;

        let cfg: Config = serde_norway::from_str(YAML).unwrap();
        assert_eq!(cfg.max_run_time(), Duration::from_secs(60));
    }

    #[test]
    #[allow(clippy::unwrap_used)]
    fn parse_config_yaml_without_max_run_time() {
        const YAML: &str = r#"---
default_adapter: foo
adapters: {}
filters: []
db: "foo.db"
...
"#;

        let cfg: Config = serde_norway::from_str(YAML).unwrap();
        assert_eq!(cfg.max_run_time(), DEFAULT_MAX_RUN_TIME);
    }
}