Skip to main content

courier/config/
parse.rs

1use std::path::Path;
2
3use anyhow::{Context, Result};
4use serde::de::DeserializeOwned;
5use serde_json::Value;
6use toml::{Table, Value as TomlValue};
7
8use super::interpolate::interpolate_config_value;
9use super::raw::RawConfig;
10use super::redact::redact_secret_values_in_text;
11use super::types::Config;
12
13pub fn parse_config<T: DeserializeOwned>(kind: &str, config: Value) -> Result<T> {
14    deserialize_json_value(config)
15        .with_context(|| format!("invalid config for component type '{kind}'"))
16}
17
18impl Config {
19    pub fn from_toml_str(s: &str) -> Result<Self> {
20        Self::from_toml_str_with_base(s, None)
21    }
22
23    pub(super) fn from_toml_str_with_base(s: &str, base_dir: Option<&Path>) -> Result<Self> {
24        let toml_value: TomlValue = toml::from_str(s).context("failed to parse TOML config")?;
25        let mut json_value =
26            toml_value_to_json(toml_value).context("failed to parse TOML config")?;
27        interpolate_config_value(&mut json_value, base_dir)?;
28        resolve_script_file_paths(&mut json_value, base_dir);
29        let raw: RawConfig =
30            deserialize_json_value(json_value).context("failed to parse TOML config")?;
31        Ok(raw.into())
32    }
33
34    pub fn from_json_str(s: &str) -> Result<Self> {
35        Self::from_json_str_with_base(s, None)
36    }
37
38    pub(super) fn from_json_str_with_base(s: &str, base_dir: Option<&Path>) -> Result<Self> {
39        let mut json_value: Value =
40            serde_json::from_str(s).context("failed to parse JSON config")?;
41        interpolate_config_value(&mut json_value, base_dir)?;
42        resolve_script_file_paths(&mut json_value, base_dir);
43        let raw: RawConfig =
44            deserialize_json_value(json_value).context("failed to parse JSON config")?;
45        Ok(raw.into())
46    }
47}
48
49pub(super) fn parse_by_extension(
50    path: &Path,
51    content: &str,
52    base_dir: Option<&Path>,
53) -> Result<Config> {
54    match path.extension().and_then(|s| s.to_str()) {
55        Some("json") => Config::from_json_str_with_base(content, base_dir),
56        Some("toml") | None => Config::from_toml_str_with_base(content, base_dir),
57        Some(other) => Err(anyhow::anyhow!(
58            "unsupported config file extension '.{other}' (expected '.toml' or '.json')"
59        )),
60    }
61}
62
63fn deserialize_json_value<T: DeserializeOwned>(value: Value) -> Result<T> {
64    serde_json::from_value(value)
65        .map_err(|err| anyhow::anyhow!("{}", redact_secret_values_in_text(&err.to_string())))
66}
67
68fn resolve_script_file_paths(value: &mut Value, base_dir: Option<&Path>) {
69    let Some(base_dir) = base_dir else {
70        return;
71    };
72    let Some(pipelines) = value.get_mut("pipelines").and_then(Value::as_array_mut) else {
73        return;
74    };
75
76    for pipeline in pipelines {
77        let Some(transforms) = pipeline.get_mut("transforms").and_then(Value::as_array_mut) else {
78            continue;
79        };
80
81        for transform in transforms {
82            if transform.get("type").and_then(Value::as_str) != Some("script") {
83                continue;
84            }
85
86            let resolved = transform
87                .get("script_file")
88                .and_then(Value::as_str)
89                .and_then(|script_file| {
90                    let path = Path::new(script_file);
91                    path.is_relative()
92                        .then(|| base_dir.join(path).to_string_lossy().into_owned())
93                });
94
95            if let Some(resolved) = resolved {
96                transform["script_file"] = Value::String(resolved);
97            }
98        }
99    }
100}
101
102fn toml_table_to_json(table: Table) -> Result<Value> {
103    Ok(Value::Object(
104        table
105            .into_iter()
106            .map(|(key, value)| toml_value_to_json(value).map(|value| (key, value)))
107            .collect::<Result<serde_json::Map<String, Value>>>()?,
108    ))
109}
110
111fn toml_value_to_json(value: TomlValue) -> Result<Value> {
112    Ok(match value {
113        TomlValue::String(value) => Value::String(value),
114        TomlValue::Integer(value) => Value::Number(serde_json::Number::from(value)),
115        TomlValue::Float(value) => Value::Number(
116            serde_json::Number::from_f64(value)
117                .ok_or_else(|| anyhow::anyhow!("non-finite TOML floats are not supported"))?,
118        ),
119        TomlValue::Boolean(value) => Value::Bool(value),
120        TomlValue::Datetime(value) => Value::String(value.to_string()),
121        TomlValue::Array(values) => Value::Array(
122            values
123                .into_iter()
124                .map(toml_value_to_json)
125                .collect::<Result<Vec<_>>>()?,
126        ),
127        TomlValue::Table(table) => toml_table_to_json(table)?,
128    })
129}
130
131#[cfg(test)]
132mod tests {
133    use std::path::PathBuf;
134
135    use serde_json::json;
136
137    use crate::config::{
138        Config, ENV_LOCK, ErrorPolicyConfig, REDACTED_SECRET, clear_secret_values_for_test,
139        register_secret_value,
140    };
141    use crate::retry::ExhaustedPolicy;
142
143    use super::parse_config;
144
145    fn set_env_var(key: &str, value: &str) {
146        unsafe {
147            std::env::set_var(key, value);
148        }
149    }
150
151    fn remove_env_var(key: &str) {
152        unsafe {
153            std::env::remove_var(key);
154        }
155    }
156
157    #[test]
158    fn preserves_arbitrary_component_fields() {
159        let config = Config::from_toml_str(
160            r#"
161            [[pipelines]]
162            name = "plugin-pipeline"
163            channel_capacity = 16
164
165            [pipelines.source]
166            type = "plugin_source"
167            nested = { enabled = true, limit = 3 }
168            labels = ["a", "b"]
169
170            [[pipelines.transforms]]
171            type = "plugin_transform"
172            on_error = "fail_pipeline"
173            script = "return value"
174            timeout_secs = 10
175
176            [[pipelines.sinks]]
177            type = "plugin_sink"
178            endpoint = "https://example.test"
179            headers = { authorization = "token" }
180            "#,
181        )
182        .unwrap();
183
184        assert_eq!(config.pipelines.len(), 1);
185        let pipeline = &config.pipelines[0];
186        assert_eq!(pipeline.channel_capacity, Some(16));
187        assert_eq!(pipeline.source.kind, "plugin_source");
188        assert_eq!(
189            pipeline.source.config,
190            json!({
191                "nested": { "enabled": true, "limit": 3 },
192                "labels": ["a", "b"]
193            })
194        );
195        assert_eq!(pipeline.transforms[0].kind, "plugin_transform");
196        assert_eq!(
197            pipeline.transforms[0].on_error,
198            Some(ErrorPolicyConfig::FailPipeline)
199        );
200        assert_eq!(
201            pipeline.transforms[0].config,
202            json!({
203                "script": "return value",
204                "timeout_secs": 10
205            })
206        );
207        assert_eq!(pipeline.sinks[0].kind, "plugin_sink");
208        assert_eq!(pipeline.sinks[0].on_error, None);
209        assert_eq!(pipeline.sinks[0].retry, None);
210        assert_eq!(
211            pipeline.sinks[0].config,
212            json!({
213                "endpoint": "https://example.test",
214                "headers": { "authorization": "token" }
215            })
216        );
217    }
218
219    #[test]
220    fn parses_retry_policy_with_dead_letter() {
221        let config = Config::from_toml_str(
222            r#"
223            [[pipelines]]
224            name = "with-retry"
225
226            [pipelines.source]
227            type = "noop"
228
229            [[pipelines.sinks]]
230            type = "noop"
231            target = "x"
232
233            [pipelines.sinks.retry]
234            max_attempts = 5
235            initial_delay_ms = 200
236            backoff_multiplier = 2.0
237            max_delay_ms = 5000
238
239            [pipelines.sinks.retry.on_exhausted]
240            kind = "dead_letter"
241            path = "/tmp/dlq.jsonl"
242            "#,
243        )
244        .unwrap();
245
246        let sink = &config.pipelines[0].sinks[0];
247        assert_eq!(sink.config, json!({ "target": "x" }));
248        let retry = sink.retry.as_ref().expect("retry should deserialize");
249        assert_eq!(retry.max_attempts, 5);
250        assert_eq!(retry.initial_delay_ms, 200);
251        assert_eq!(retry.backoff_multiplier, 2.0);
252        assert_eq!(retry.max_delay_ms, 5000);
253        assert_eq!(
254            retry.on_exhausted,
255            ExhaustedPolicy::DeadLetter {
256                path: PathBuf::from("/tmp/dlq.jsonl")
257            }
258        );
259    }
260
261    #[test]
262    fn defaults_retry_to_none_when_omitted() {
263        let config = Config::from_toml_str(
264            r#"
265            [[pipelines]]
266            name = "no-retry"
267
268            [pipelines.source]
269            type = "noop"
270
271            [[pipelines.sinks]]
272            type = "noop"
273            "#,
274        )
275        .unwrap();
276
277        assert_eq!(config.pipelines[0].sinks[0].retry, None);
278    }
279
280    #[test]
281    fn from_toml_str_reports_parse_error() {
282        let err = Config::from_toml_str("not valid toml ===").unwrap_err();
283        let msg = format!("{err:#}");
284        assert!(msg.contains("failed to parse TOML config"), "{msg}");
285    }
286
287    #[test]
288    fn from_toml_str_rejects_non_finite_floats_without_panicking() {
289        for value in ["nan", "inf", "-inf"] {
290            let err = Config::from_toml_str(&format!(
291                r#"
292                [[pipelines]]
293                name = "p"
294
295                [pipelines.source]
296                type = "noop"
297                threshold = {value}
298
299                [[pipelines.sinks]]
300                type = "noop"
301                "#
302            ))
303            .unwrap_err();
304
305            let msg = format!("{err:#}");
306            assert!(msg.contains("failed to parse TOML config"), "{msg}");
307            assert!(
308                msg.contains("non-finite TOML floats are not supported"),
309                "{msg}"
310            );
311        }
312    }
313
314    #[test]
315    fn from_json_str_preserves_arbitrary_component_fields() {
316        let config = Config::from_json_str(
317            r#"{
318              "pipelines": [
319                {
320                  "name": "plugin-pipeline",
321                  "channel_capacity": 16,
322                  "source": {
323                    "type": "plugin_source",
324                    "nested": { "enabled": true, "limit": 3 },
325                    "labels": ["a", "b"]
326                  },
327                  "transforms": [
328                    {
329                      "type": "plugin_transform",
330                      "on_error": "fail_pipeline",
331                      "script": "return value",
332                      "timeout_secs": 10
333                    }
334                  ],
335                  "sinks": [
336                    {
337                      "type": "plugin_sink",
338                      "endpoint": "https://example.test",
339                      "headers": { "authorization": "token" }
340                    }
341                  ]
342                }
343              ]
344            }"#,
345        )
346        .unwrap();
347
348        assert_eq!(config.pipelines.len(), 1);
349        let pipeline = &config.pipelines[0];
350        assert_eq!(pipeline.channel_capacity, Some(16));
351        assert_eq!(
352            pipeline.source.config,
353            json!({
354                "nested": { "enabled": true, "limit": 3 },
355                "labels": ["a", "b"]
356            })
357        );
358        assert_eq!(
359            pipeline.transforms[0].on_error,
360            Some(ErrorPolicyConfig::FailPipeline)
361        );
362        assert_eq!(
363            pipeline.sinks[0].config,
364            json!({
365                "endpoint": "https://example.test",
366                "headers": { "authorization": "token" }
367            })
368        );
369    }
370
371    #[test]
372    fn from_json_str_reports_parse_error() {
373        let err = Config::from_json_str("{ not valid json ===").unwrap_err();
374        let msg = format!("{err:#}");
375        assert!(msg.contains("failed to parse JSON config"), "{msg}");
376    }
377
378    #[test]
379    fn config_parse_errors_redact_interpolated_secrets() {
380        let _guard = ENV_LOCK.lock().unwrap();
381        clear_secret_values_for_test();
382        set_env_var("COURIER_TEST_CHANNEL_CAPACITY", "super-secret-capacity");
383
384        let err = Config::from_toml_str(
385            r#"
386            [[pipelines]]
387            name = "p"
388            channel_capacity = "${secret:COURIER_TEST_CHANNEL_CAPACITY}"
389
390            [pipelines.source]
391            type = "noop"
392
393            [[pipelines.sinks]]
394            type = "noop"
395            "#,
396        )
397        .unwrap_err();
398
399        let msg = format!("{err:#}");
400        assert!(msg.contains("failed to parse TOML config"), "{msg}");
401        assert!(!msg.contains("super-secret-capacity"), "{msg}");
402        assert!(msg.contains(REDACTED_SECRET), "{msg}");
403
404        remove_env_var("COURIER_TEST_CHANNEL_CAPACITY");
405    }
406
407    #[test]
408    fn component_parse_errors_redact_registered_secrets() {
409        let _guard = ENV_LOCK.lock().unwrap();
410        clear_secret_values_for_test();
411        register_secret_value("component-secret-value");
412
413        #[derive(Debug, serde::Deserialize)]
414        #[allow(dead_code)]
415        struct NeedsNumber {
416            n: u64,
417        }
418
419        let err = parse_config::<NeedsNumber>(
420            "demo",
421            json!({
422                "n": "component-secret-value"
423            }),
424        )
425        .unwrap_err();
426
427        let msg = format!("{err:#}");
428        assert!(
429            msg.contains("invalid config for component type 'demo'"),
430            "{msg}"
431        );
432        assert!(!msg.contains("component-secret-value"), "{msg}");
433        assert!(msg.contains(REDACTED_SECRET), "{msg}");
434    }
435
436    #[test]
437    fn parses_source_retry_policy() {
438        let config = Config::from_toml_str(
439            r#"
440            [[pipelines]]
441            name = "p"
442
443            [pipelines.source]
444            type = "api_poll"
445            url = "https://example.test/data"
446            interval_secs = 30
447
448            [pipelines.source.retry]
449            max_attempts = 5
450            initial_delay_ms = 200
451            backoff_multiplier = 2.0
452            max_delay_ms = 5000
453            on_exhausted = { kind = "propagate" }
454
455            [[pipelines.sinks]]
456            type = "noop"
457            "#,
458        )
459        .unwrap();
460
461        let source = &config.pipelines[0].source;
462        assert_eq!(
463            source.config,
464            json!({ "url": "https://example.test/data", "interval_secs": 30 })
465        );
466        let retry = source.retry.as_ref().expect("source retry should parse");
467        assert_eq!(retry.max_attempts, 5);
468        assert_eq!(retry.initial_delay_ms, 200);
469    }
470}