data-courier 0.1.0-beta.2

Async Rust framework for composable data pipelines
Documentation
use std::path::Path;

use anyhow::{Context, Result};
use serde::de::DeserializeOwned;
use serde_json::Value;
use toml::{Table, Value as TomlValue};

use super::interpolate::interpolate_config_value;
use super::raw::RawConfig;
use super::redact::redact_secret_values_in_text;
use super::types::Config;

pub fn parse_config<T: DeserializeOwned>(kind: &str, config: Value) -> Result<T> {
    deserialize_json_value(config)
        .with_context(|| format!("invalid config for component type '{kind}'"))
}

impl Config {
    pub fn from_toml_str(s: &str) -> Result<Self> {
        Self::from_toml_str_with_base(s, None)
    }

    pub(super) fn from_toml_str_with_base(s: &str, base_dir: Option<&Path>) -> Result<Self> {
        let toml_value: TomlValue = toml::from_str(s).context("failed to parse TOML config")?;
        let mut json_value = toml_value_to_json(toml_value);
        interpolate_config_value(&mut json_value, base_dir)?;
        let raw: RawConfig =
            deserialize_json_value(json_value).context("failed to parse TOML config")?;
        Ok(raw.into())
    }

    pub fn from_json_str(s: &str) -> Result<Self> {
        Self::from_json_str_with_base(s, None)
    }

    pub(super) fn from_json_str_with_base(s: &str, base_dir: Option<&Path>) -> Result<Self> {
        let mut json_value: Value =
            serde_json::from_str(s).context("failed to parse JSON config")?;
        interpolate_config_value(&mut json_value, base_dir)?;
        let raw: RawConfig =
            deserialize_json_value(json_value).context("failed to parse JSON config")?;
        Ok(raw.into())
    }
}

pub(super) fn parse_by_extension(
    path: &Path,
    content: &str,
    base_dir: Option<&Path>,
) -> Result<Config> {
    match path.extension().and_then(|s| s.to_str()) {
        Some("json") => Config::from_json_str_with_base(content, base_dir),
        Some("toml") | None => Config::from_toml_str_with_base(content, base_dir),
        Some(other) => Err(anyhow::anyhow!(
            "unsupported config file extension '.{other}' (expected '.toml' or '.json')"
        )),
    }
}

fn deserialize_json_value<T: DeserializeOwned>(value: Value) -> Result<T> {
    serde_json::from_value(value)
        .map_err(|err| anyhow::anyhow!("{}", redact_secret_values_in_text(&err.to_string())))
}

fn toml_table_to_json(table: Table) -> Value {
    Value::Object(
        table
            .into_iter()
            .map(|(key, value)| (key, toml_value_to_json(value)))
            .collect::<serde_json::Map<String, Value>>(),
    )
}

fn toml_value_to_json(value: TomlValue) -> Value {
    match value {
        TomlValue::String(value) => Value::String(value),
        TomlValue::Integer(value) => Value::Number(serde_json::Number::from(value)),
        TomlValue::Float(value) => {
            Value::Number(serde_json::Number::from_f64(value).expect("TOML float should be finite"))
        }
        TomlValue::Boolean(value) => Value::Bool(value),
        TomlValue::Datetime(value) => Value::String(value.to_string()),
        TomlValue::Array(values) => {
            Value::Array(values.into_iter().map(toml_value_to_json).collect())
        }
        TomlValue::Table(table) => toml_table_to_json(table),
    }
}

#[cfg(test)]
mod tests {
    use std::path::PathBuf;

    use serde_json::json;

    use crate::config::{
        Config, ENV_LOCK, ErrorPolicyConfig, REDACTED_SECRET, clear_secret_values_for_test,
        register_secret_value,
    };
    use crate::retry::ExhaustedPolicy;

    use super::parse_config;

    fn set_env_var(key: &str, value: &str) {
        unsafe {
            std::env::set_var(key, value);
        }
    }

    fn remove_env_var(key: &str) {
        unsafe {
            std::env::remove_var(key);
        }
    }

    #[test]
    fn preserves_arbitrary_component_fields() {
        let config = Config::from_toml_str(
            r#"
            [[pipelines]]
            name = "plugin-pipeline"
            channel_capacity = 16

            [pipelines.source]
            type = "plugin_source"
            nested = { enabled = true, limit = 3 }
            labels = ["a", "b"]

            [[pipelines.transforms]]
            type = "plugin_transform"
            on_error = "fail_pipeline"
            script = "return value"
            timeout_secs = 10

            [[pipelines.sinks]]
            type = "plugin_sink"
            endpoint = "https://example.test"
            headers = { authorization = "token" }
            "#,
        )
        .unwrap();

        assert_eq!(config.pipelines.len(), 1);
        let pipeline = &config.pipelines[0];
        assert_eq!(pipeline.channel_capacity, Some(16));
        assert_eq!(pipeline.source.kind, "plugin_source");
        assert_eq!(
            pipeline.source.config,
            json!({
                "nested": { "enabled": true, "limit": 3 },
                "labels": ["a", "b"]
            })
        );
        assert_eq!(pipeline.transforms[0].kind, "plugin_transform");
        assert_eq!(
            pipeline.transforms[0].on_error,
            Some(ErrorPolicyConfig::FailPipeline)
        );
        assert_eq!(
            pipeline.transforms[0].config,
            json!({
                "script": "return value",
                "timeout_secs": 10
            })
        );
        assert_eq!(pipeline.sinks[0].kind, "plugin_sink");
        assert_eq!(pipeline.sinks[0].on_error, None);
        assert_eq!(pipeline.sinks[0].retry, None);
        assert_eq!(
            pipeline.sinks[0].config,
            json!({
                "endpoint": "https://example.test",
                "headers": { "authorization": "token" }
            })
        );
    }

    #[test]
    fn parses_retry_policy_with_dead_letter() {
        let config = Config::from_toml_str(
            r#"
            [[pipelines]]
            name = "with-retry"

            [pipelines.source]
            type = "noop"

            [[pipelines.sinks]]
            type = "noop"
            target = "x"

            [pipelines.sinks.retry]
            max_attempts = 5
            initial_delay_ms = 200
            backoff_multiplier = 2.0
            max_delay_ms = 5000

            [pipelines.sinks.retry.on_exhausted]
            kind = "dead_letter"
            path = "/tmp/dlq.jsonl"
            "#,
        )
        .unwrap();

        let sink = &config.pipelines[0].sinks[0];
        assert_eq!(sink.config, json!({ "target": "x" }));
        let retry = sink.retry.as_ref().expect("retry should deserialize");
        assert_eq!(retry.max_attempts, 5);
        assert_eq!(retry.initial_delay_ms, 200);
        assert_eq!(retry.backoff_multiplier, 2.0);
        assert_eq!(retry.max_delay_ms, 5000);
        assert_eq!(
            retry.on_exhausted,
            ExhaustedPolicy::DeadLetter {
                path: PathBuf::from("/tmp/dlq.jsonl")
            }
        );
    }

    #[test]
    fn defaults_retry_to_none_when_omitted() {
        let config = Config::from_toml_str(
            r#"
            [[pipelines]]
            name = "no-retry"

            [pipelines.source]
            type = "noop"

            [[pipelines.sinks]]
            type = "noop"
            "#,
        )
        .unwrap();

        assert_eq!(config.pipelines[0].sinks[0].retry, None);
    }

    #[test]
    fn from_toml_str_reports_parse_error() {
        let err = Config::from_toml_str("not valid toml ===").unwrap_err();
        let msg = format!("{err:#}");
        assert!(msg.contains("failed to parse TOML config"), "{msg}");
    }

    #[test]
    fn from_json_str_preserves_arbitrary_component_fields() {
        let config = Config::from_json_str(
            r#"{
              "pipelines": [
                {
                  "name": "plugin-pipeline",
                  "channel_capacity": 16,
                  "source": {
                    "type": "plugin_source",
                    "nested": { "enabled": true, "limit": 3 },
                    "labels": ["a", "b"]
                  },
                  "transforms": [
                    {
                      "type": "plugin_transform",
                      "on_error": "fail_pipeline",
                      "script": "return value",
                      "timeout_secs": 10
                    }
                  ],
                  "sinks": [
                    {
                      "type": "plugin_sink",
                      "endpoint": "https://example.test",
                      "headers": { "authorization": "token" }
                    }
                  ]
                }
              ]
            }"#,
        )
        .unwrap();

        assert_eq!(config.pipelines.len(), 1);
        let pipeline = &config.pipelines[0];
        assert_eq!(pipeline.channel_capacity, Some(16));
        assert_eq!(
            pipeline.source.config,
            json!({
                "nested": { "enabled": true, "limit": 3 },
                "labels": ["a", "b"]
            })
        );
        assert_eq!(
            pipeline.transforms[0].on_error,
            Some(ErrorPolicyConfig::FailPipeline)
        );
        assert_eq!(
            pipeline.sinks[0].config,
            json!({
                "endpoint": "https://example.test",
                "headers": { "authorization": "token" }
            })
        );
    }

    #[test]
    fn from_json_str_reports_parse_error() {
        let err = Config::from_json_str("{ not valid json ===").unwrap_err();
        let msg = format!("{err:#}");
        assert!(msg.contains("failed to parse JSON config"), "{msg}");
    }

    #[test]
    fn config_parse_errors_redact_interpolated_secrets() {
        let _guard = ENV_LOCK.lock().unwrap();
        clear_secret_values_for_test();
        set_env_var("COURIER_TEST_CHANNEL_CAPACITY", "super-secret-capacity");

        let err = Config::from_toml_str(
            r#"
            [[pipelines]]
            name = "p"
            channel_capacity = "${secret:COURIER_TEST_CHANNEL_CAPACITY}"

            [pipelines.source]
            type = "noop"

            [[pipelines.sinks]]
            type = "noop"
            "#,
        )
        .unwrap_err();

        let msg = format!("{err:#}");
        assert!(msg.contains("failed to parse TOML config"), "{msg}");
        assert!(!msg.contains("super-secret-capacity"), "{msg}");
        assert!(msg.contains(REDACTED_SECRET), "{msg}");

        remove_env_var("COURIER_TEST_CHANNEL_CAPACITY");
    }

    #[test]
    fn component_parse_errors_redact_registered_secrets() {
        let _guard = ENV_LOCK.lock().unwrap();
        clear_secret_values_for_test();
        register_secret_value("component-secret-value");

        #[derive(Debug, serde::Deserialize)]
        #[allow(dead_code)]
        struct NeedsNumber {
            n: u64,
        }

        let err = parse_config::<NeedsNumber>(
            "demo",
            json!({
                "n": "component-secret-value"
            }),
        )
        .unwrap_err();

        let msg = format!("{err:#}");
        assert!(
            msg.contains("invalid config for component type 'demo'"),
            "{msg}"
        );
        assert!(!msg.contains("component-secret-value"), "{msg}");
        assert!(msg.contains(REDACTED_SECRET), "{msg}");
    }

    #[test]
    fn parses_source_retry_policy() {
        let config = Config::from_toml_str(
            r#"
            [[pipelines]]
            name = "p"

            [pipelines.source]
            type = "api_poll"
            url = "https://example.test/data"
            interval_secs = 30

            [pipelines.source.retry]
            max_attempts = 5
            initial_delay_ms = 200
            backoff_multiplier = 2.0
            max_delay_ms = 5000
            on_exhausted = { kind = "propagate" }

            [[pipelines.sinks]]
            type = "noop"
            "#,
        )
        .unwrap();

        let source = &config.pipelines[0].source;
        assert_eq!(
            source.config,
            json!({ "url": "https://example.test/data", "interval_secs": 30 })
        );
        let retry = source.retry.as_ref().expect("source retry should parse");
        assert_eq!(retry.max_attempts, 5);
        assert_eq!(retry.initial_delay_ms, 200);
    }
}