1use std::path::Path;
2
3use anyhow::{Context, Result};
4use serde::de::DeserializeOwned;
5use serde_json::Value;
6use toml::{Table, Value as TomlValue};
7
8use super::interpolate::interpolate_config_value;
9use super::raw::RawConfig;
10use super::redact::redact_secret_values_in_text;
11use super::types::Config;
12
13pub fn parse_config<T: DeserializeOwned>(kind: &str, config: Value) -> Result<T> {
14 deserialize_json_value(config)
15 .with_context(|| format!("invalid config for component type '{kind}'"))
16}
17
18impl Config {
19 pub fn from_toml_str(s: &str) -> Result<Self> {
20 Self::from_toml_str_with_base(s, None)
21 }
22
23 pub(super) fn from_toml_str_with_base(s: &str, base_dir: Option<&Path>) -> Result<Self> {
24 let toml_value: TomlValue = toml::from_str(s).context("failed to parse TOML config")?;
25 let mut json_value =
26 toml_value_to_json(toml_value).context("failed to parse TOML config")?;
27 interpolate_config_value(&mut json_value, base_dir)?;
28 resolve_script_file_paths(&mut json_value, base_dir);
29 let raw: RawConfig =
30 deserialize_json_value(json_value).context("failed to parse TOML config")?;
31 Ok(raw.into())
32 }
33
34 pub fn from_json_str(s: &str) -> Result<Self> {
35 Self::from_json_str_with_base(s, None)
36 }
37
38 pub(super) fn from_json_str_with_base(s: &str, base_dir: Option<&Path>) -> Result<Self> {
39 let mut json_value: Value =
40 serde_json::from_str(s).context("failed to parse JSON config")?;
41 interpolate_config_value(&mut json_value, base_dir)?;
42 resolve_script_file_paths(&mut json_value, base_dir);
43 let raw: RawConfig =
44 deserialize_json_value(json_value).context("failed to parse JSON config")?;
45 Ok(raw.into())
46 }
47}
48
49pub(super) fn parse_by_extension(
50 path: &Path,
51 content: &str,
52 base_dir: Option<&Path>,
53) -> Result<Config> {
54 match path.extension().and_then(|s| s.to_str()) {
55 Some("json") => Config::from_json_str_with_base(content, base_dir),
56 Some("toml") | None => Config::from_toml_str_with_base(content, base_dir),
57 Some(other) => Err(anyhow::anyhow!(
58 "unsupported config file extension '.{other}' (expected '.toml' or '.json')"
59 )),
60 }
61}
62
63fn deserialize_json_value<T: DeserializeOwned>(value: Value) -> Result<T> {
64 serde_json::from_value(value)
65 .map_err(|err| anyhow::anyhow!("{}", redact_secret_values_in_text(&err.to_string())))
66}
67
68fn resolve_script_file_paths(value: &mut Value, base_dir: Option<&Path>) {
69 let Some(base_dir) = base_dir else {
70 return;
71 };
72 let Some(pipelines) = value.get_mut("pipelines").and_then(Value::as_array_mut) else {
73 return;
74 };
75
76 for pipeline in pipelines {
77 let Some(transforms) = pipeline.get_mut("transforms").and_then(Value::as_array_mut) else {
78 continue;
79 };
80
81 for transform in transforms {
82 if transform.get("type").and_then(Value::as_str) != Some("script") {
83 continue;
84 }
85
86 let resolved = transform
87 .get("script_file")
88 .and_then(Value::as_str)
89 .and_then(|script_file| {
90 let path = Path::new(script_file);
91 path.is_relative()
92 .then(|| base_dir.join(path).to_string_lossy().into_owned())
93 });
94
95 if let Some(resolved) = resolved {
96 transform["script_file"] = Value::String(resolved);
97 }
98 }
99 }
100}
101
102fn toml_table_to_json(table: Table) -> Result<Value> {
103 Ok(Value::Object(
104 table
105 .into_iter()
106 .map(|(key, value)| toml_value_to_json(value).map(|value| (key, value)))
107 .collect::<Result<serde_json::Map<String, Value>>>()?,
108 ))
109}
110
111fn toml_value_to_json(value: TomlValue) -> Result<Value> {
112 Ok(match value {
113 TomlValue::String(value) => Value::String(value),
114 TomlValue::Integer(value) => Value::Number(serde_json::Number::from(value)),
115 TomlValue::Float(value) => Value::Number(
116 serde_json::Number::from_f64(value)
117 .ok_or_else(|| anyhow::anyhow!("non-finite TOML floats are not supported"))?,
118 ),
119 TomlValue::Boolean(value) => Value::Bool(value),
120 TomlValue::Datetime(value) => Value::String(value.to_string()),
121 TomlValue::Array(values) => Value::Array(
122 values
123 .into_iter()
124 .map(toml_value_to_json)
125 .collect::<Result<Vec<_>>>()?,
126 ),
127 TomlValue::Table(table) => toml_table_to_json(table)?,
128 })
129}
130
131#[cfg(test)]
132mod tests {
133 use std::path::PathBuf;
134
135 use serde_json::json;
136
137 use crate::config::{
138 Config, ENV_LOCK, ErrorPolicyConfig, REDACTED_SECRET, clear_secret_values_for_test,
139 register_secret_value,
140 };
141 use crate::retry::ExhaustedPolicy;
142
143 use super::parse_config;
144
145 fn set_env_var(key: &str, value: &str) {
146 unsafe {
147 std::env::set_var(key, value);
148 }
149 }
150
151 fn remove_env_var(key: &str) {
152 unsafe {
153 std::env::remove_var(key);
154 }
155 }
156
157 #[test]
158 fn preserves_arbitrary_component_fields() {
159 let config = Config::from_toml_str(
160 r#"
161 [[pipelines]]
162 name = "plugin-pipeline"
163 channel_capacity = 16
164
165 [pipelines.source]
166 type = "plugin_source"
167 nested = { enabled = true, limit = 3 }
168 labels = ["a", "b"]
169
170 [[pipelines.transforms]]
171 type = "plugin_transform"
172 on_error = "fail_pipeline"
173 script = "return value"
174 timeout_secs = 10
175
176 [[pipelines.sinks]]
177 type = "plugin_sink"
178 endpoint = "https://example.test"
179 headers = { authorization = "token" }
180 "#,
181 )
182 .unwrap();
183
184 assert_eq!(config.pipelines.len(), 1);
185 let pipeline = &config.pipelines[0];
186 assert_eq!(pipeline.channel_capacity, Some(16));
187 assert_eq!(pipeline.source.kind, "plugin_source");
188 assert_eq!(
189 pipeline.source.config,
190 json!({
191 "nested": { "enabled": true, "limit": 3 },
192 "labels": ["a", "b"]
193 })
194 );
195 assert_eq!(pipeline.transforms[0].kind, "plugin_transform");
196 assert_eq!(
197 pipeline.transforms[0].on_error,
198 Some(ErrorPolicyConfig::FailPipeline)
199 );
200 assert_eq!(
201 pipeline.transforms[0].config,
202 json!({
203 "script": "return value",
204 "timeout_secs": 10
205 })
206 );
207 assert_eq!(pipeline.sinks[0].kind, "plugin_sink");
208 assert_eq!(pipeline.sinks[0].on_error, None);
209 assert_eq!(pipeline.sinks[0].retry, None);
210 assert_eq!(
211 pipeline.sinks[0].config,
212 json!({
213 "endpoint": "https://example.test",
214 "headers": { "authorization": "token" }
215 })
216 );
217 }
218
219 #[test]
220 fn parses_retry_policy_with_dead_letter() {
221 let config = Config::from_toml_str(
222 r#"
223 [[pipelines]]
224 name = "with-retry"
225
226 [pipelines.source]
227 type = "noop"
228
229 [[pipelines.sinks]]
230 type = "noop"
231 target = "x"
232
233 [pipelines.sinks.retry]
234 max_attempts = 5
235 initial_delay_ms = 200
236 backoff_multiplier = 2.0
237 max_delay_ms = 5000
238
239 [pipelines.sinks.retry.on_exhausted]
240 kind = "dead_letter"
241 path = "/tmp/dlq.jsonl"
242 "#,
243 )
244 .unwrap();
245
246 let sink = &config.pipelines[0].sinks[0];
247 assert_eq!(sink.config, json!({ "target": "x" }));
248 let retry = sink.retry.as_ref().expect("retry should deserialize");
249 assert_eq!(retry.max_attempts, 5);
250 assert_eq!(retry.initial_delay_ms, 200);
251 assert_eq!(retry.backoff_multiplier, 2.0);
252 assert_eq!(retry.max_delay_ms, 5000);
253 assert_eq!(
254 retry.on_exhausted,
255 ExhaustedPolicy::DeadLetter {
256 path: PathBuf::from("/tmp/dlq.jsonl")
257 }
258 );
259 }
260
261 #[test]
262 fn defaults_retry_to_none_when_omitted() {
263 let config = Config::from_toml_str(
264 r#"
265 [[pipelines]]
266 name = "no-retry"
267
268 [pipelines.source]
269 type = "noop"
270
271 [[pipelines.sinks]]
272 type = "noop"
273 "#,
274 )
275 .unwrap();
276
277 assert_eq!(config.pipelines[0].sinks[0].retry, None);
278 }
279
280 #[test]
281 fn from_toml_str_reports_parse_error() {
282 let err = Config::from_toml_str("not valid toml ===").unwrap_err();
283 let msg = format!("{err:#}");
284 assert!(msg.contains("failed to parse TOML config"), "{msg}");
285 }
286
287 #[test]
288 fn from_toml_str_rejects_non_finite_floats_without_panicking() {
289 for value in ["nan", "inf", "-inf"] {
290 let err = Config::from_toml_str(&format!(
291 r#"
292 [[pipelines]]
293 name = "p"
294
295 [pipelines.source]
296 type = "noop"
297 threshold = {value}
298
299 [[pipelines.sinks]]
300 type = "noop"
301 "#
302 ))
303 .unwrap_err();
304
305 let msg = format!("{err:#}");
306 assert!(msg.contains("failed to parse TOML config"), "{msg}");
307 assert!(
308 msg.contains("non-finite TOML floats are not supported"),
309 "{msg}"
310 );
311 }
312 }
313
314 #[test]
315 fn from_json_str_preserves_arbitrary_component_fields() {
316 let config = Config::from_json_str(
317 r#"{
318 "pipelines": [
319 {
320 "name": "plugin-pipeline",
321 "channel_capacity": 16,
322 "source": {
323 "type": "plugin_source",
324 "nested": { "enabled": true, "limit": 3 },
325 "labels": ["a", "b"]
326 },
327 "transforms": [
328 {
329 "type": "plugin_transform",
330 "on_error": "fail_pipeline",
331 "script": "return value",
332 "timeout_secs": 10
333 }
334 ],
335 "sinks": [
336 {
337 "type": "plugin_sink",
338 "endpoint": "https://example.test",
339 "headers": { "authorization": "token" }
340 }
341 ]
342 }
343 ]
344 }"#,
345 )
346 .unwrap();
347
348 assert_eq!(config.pipelines.len(), 1);
349 let pipeline = &config.pipelines[0];
350 assert_eq!(pipeline.channel_capacity, Some(16));
351 assert_eq!(
352 pipeline.source.config,
353 json!({
354 "nested": { "enabled": true, "limit": 3 },
355 "labels": ["a", "b"]
356 })
357 );
358 assert_eq!(
359 pipeline.transforms[0].on_error,
360 Some(ErrorPolicyConfig::FailPipeline)
361 );
362 assert_eq!(
363 pipeline.sinks[0].config,
364 json!({
365 "endpoint": "https://example.test",
366 "headers": { "authorization": "token" }
367 })
368 );
369 }
370
371 #[test]
372 fn from_json_str_reports_parse_error() {
373 let err = Config::from_json_str("{ not valid json ===").unwrap_err();
374 let msg = format!("{err:#}");
375 assert!(msg.contains("failed to parse JSON config"), "{msg}");
376 }
377
378 #[test]
379 fn config_parse_errors_redact_interpolated_secrets() {
380 let _guard = ENV_LOCK.lock().unwrap();
381 clear_secret_values_for_test();
382 set_env_var("COURIER_TEST_CHANNEL_CAPACITY", "super-secret-capacity");
383
384 let err = Config::from_toml_str(
385 r#"
386 [[pipelines]]
387 name = "p"
388 channel_capacity = "${secret:COURIER_TEST_CHANNEL_CAPACITY}"
389
390 [pipelines.source]
391 type = "noop"
392
393 [[pipelines.sinks]]
394 type = "noop"
395 "#,
396 )
397 .unwrap_err();
398
399 let msg = format!("{err:#}");
400 assert!(msg.contains("failed to parse TOML config"), "{msg}");
401 assert!(!msg.contains("super-secret-capacity"), "{msg}");
402 assert!(msg.contains(REDACTED_SECRET), "{msg}");
403
404 remove_env_var("COURIER_TEST_CHANNEL_CAPACITY");
405 }
406
407 #[test]
408 fn component_parse_errors_redact_registered_secrets() {
409 let _guard = ENV_LOCK.lock().unwrap();
410 clear_secret_values_for_test();
411 register_secret_value("component-secret-value");
412
413 #[derive(Debug, serde::Deserialize)]
414 #[allow(dead_code)]
415 struct NeedsNumber {
416 n: u64,
417 }
418
419 let err = parse_config::<NeedsNumber>(
420 "demo",
421 json!({
422 "n": "component-secret-value"
423 }),
424 )
425 .unwrap_err();
426
427 let msg = format!("{err:#}");
428 assert!(
429 msg.contains("invalid config for component type 'demo'"),
430 "{msg}"
431 );
432 assert!(!msg.contains("component-secret-value"), "{msg}");
433 assert!(msg.contains(REDACTED_SECRET), "{msg}");
434 }
435
436 #[test]
437 fn parses_source_retry_policy() {
438 let config = Config::from_toml_str(
439 r#"
440 [[pipelines]]
441 name = "p"
442
443 [pipelines.source]
444 type = "api_poll"
445 url = "https://example.test/data"
446 interval_secs = 30
447
448 [pipelines.source.retry]
449 max_attempts = 5
450 initial_delay_ms = 200
451 backoff_multiplier = 2.0
452 max_delay_ms = 5000
453 on_exhausted = { kind = "propagate" }
454
455 [[pipelines.sinks]]
456 type = "noop"
457 "#,
458 )
459 .unwrap();
460
461 let source = &config.pipelines[0].source;
462 assert_eq!(
463 source.config,
464 json!({ "url": "https://example.test/data", "interval_secs": 30 })
465 );
466 let retry = source.retry.as_ref().expect("source retry should parse");
467 assert_eq!(retry.max_attempts, 5);
468 assert_eq!(retry.initial_delay_ms, 200);
469 }
470}