Skip to main content

courier/config/
validate.rs

1use std::collections::HashMap;
2
3use anyhow::{Context, Result, bail};
4use tracing_subscriber::EnvFilter;
5
6use crate::retry::RetryPolicy;
7
8use super::observability::ObservabilityConfig;
9use super::redact::{redact_secret, redact_secret_path};
10use super::types::Config;
11
12impl Config {
13    pub fn validate(&self) -> Result<()> {
14        if let Some(obs) = &self.observability {
15            validate_observability(obs)?;
16        }
17
18        let mut seen_names: HashMap<&str, usize> = HashMap::new();
19
20        for (pipeline_index, pipeline) in self.pipelines.iter().enumerate() {
21            let pipeline_path = format!("pipelines[{pipeline_index}]");
22            let pipeline_label = pipeline_label(pipeline_index, &pipeline.name);
23
24            if pipeline.name.trim().is_empty() {
25                bail!("{pipeline_path}.name: pipeline name must not be empty");
26            }
27
28            if let Some(previous_index) = seen_names.insert(pipeline.name.as_str(), pipeline_index)
29            {
30                bail!(
31                    "{pipeline_path}.name: duplicate pipeline name '{}' (already defined at pipelines[{previous_index}].name)",
32                    redact_secret(&pipeline.name)
33                );
34            }
35
36            if pipeline.source.kind.trim().is_empty() {
37                bail!("{pipeline_label}.source.type: source type must not be empty");
38            }
39
40            if let Some(retry) = &pipeline.source.retry {
41                validate_retry_policy(retry, &format!("{pipeline_label}.source.retry"), false)?;
42            }
43
44            if matches!(pipeline.channel_capacity, Some(0)) {
45                bail!("{pipeline_label}.channel_capacity: must be greater than 0");
46            }
47
48            if pipeline.sinks.is_empty() {
49                bail!("{pipeline_label}.sinks: at least one sink is required");
50            }
51
52            for (transform_index, transform) in pipeline.transforms.iter().enumerate() {
53                if transform.kind.trim().is_empty() {
54                    bail!(
55                        "{pipeline_label}.transforms[{transform_index}].type: transform type must not be empty"
56                    );
57                }
58            }
59
60            for (sink_index, sink) in pipeline.sinks.iter().enumerate() {
61                if sink.kind.trim().is_empty() {
62                    bail!("{pipeline_label}.sinks[{sink_index}].type: sink type must not be empty");
63                }
64                if let Some(retry) = &sink.retry {
65                    validate_retry_policy(
66                        retry,
67                        &format!("{pipeline_label}.sinks[{sink_index}].retry"),
68                        true,
69                    )?;
70                }
71            }
72        }
73
74        Ok(())
75    }
76}
77
78fn pipeline_label(index: usize, name: &str) -> String {
79    if name.trim().is_empty() {
80        format!("pipelines[{index}]")
81    } else {
82        format!("pipeline '{}'", redact_secret(name))
83    }
84}
85
86fn validate_observability(obs: &ObservabilityConfig) -> Result<()> {
87    if !(0.0..=1.0).contains(&obs.tracing.sample_ratio) || !obs.tracing.sample_ratio.is_finite() {
88        bail!(
89            "observability.tracing.sample_ratio: must be a finite value in [0.0, 1.0] (got {})",
90            obs.tracing.sample_ratio
91        );
92    }
93    if obs.service_name.trim().is_empty() {
94        bail!("observability.service_name: must not be empty");
95    }
96    if obs.metrics.export_interval_ms == 0 {
97        bail!("observability.metrics.export_interval_ms: must be greater than 0");
98    }
99    if let Some(level) = &obs.log_level {
100        if level.trim().is_empty() {
101            bail!("observability.log_level: must not be empty when set");
102        }
103        EnvFilter::try_new(level)
104            .with_context(|| "observability.log_level: invalid log filter directive")?;
105    }
106    Ok(())
107}
108
109fn validate_retry_policy(policy: &RetryPolicy, path: &str, allow_dead_letter: bool) -> Result<()> {
110    if policy.max_attempts == 0 {
111        bail!("{path}.max_attempts: must be greater than or equal to 1");
112    }
113
114    if policy.max_attempts > 1 {
115        if policy.initial_delay_ms == 0 {
116            bail!("{path}.initial_delay_ms: must be greater than 0 when max_attempts > 1");
117        }
118        if policy.max_delay_ms == 0 {
119            bail!("{path}.max_delay_ms: must be greater than 0 when max_attempts > 1");
120        }
121    }
122
123    if !policy.backoff_multiplier.is_finite() || policy.backoff_multiplier < 1.0 {
124        bail!("{path}.backoff_multiplier: must be finite and greater than or equal to 1.0");
125    }
126
127    if policy.max_delay_ms < policy.initial_delay_ms {
128        bail!("{path}.max_delay_ms: must be greater than or equal to initial_delay_ms");
129    }
130
131    if let crate::retry::ExhaustedPolicy::DeadLetter { path: dlq_path } = &policy.on_exhausted {
132        if !allow_dead_letter {
133            bail!("{path}.on_exhausted: dead_letter is only supported for sink retry policies");
134        }
135
136        if dlq_path.as_os_str().is_empty() {
137            bail!("{path}.on_exhausted.path: dead-letter path must not be empty");
138        }
139
140        if let Some(parent) = dlq_path.parent()
141            && !parent.as_os_str().is_empty()
142        {
143            if !parent.exists() {
144                bail!(
145                    "{path}.on_exhausted.path: parent directory '{}' does not exist",
146                    redact_secret_path(parent)
147                );
148            }
149            if !parent.is_dir() {
150                bail!(
151                    "{path}.on_exhausted.path: parent '{}' is not a directory",
152                    redact_secret_path(parent)
153                );
154            }
155        }
156    }
157
158    Ok(())
159}
160
161#[cfg(test)]
162mod tests {
163    use crate::config::Config;
164
165    fn minimal_pipeline_block() -> &'static str {
166        r#"
167            [[pipelines]]
168            name = "p"
169            [pipelines.source]
170            type = "noop"
171            [[pipelines.sinks]]
172            type = "noop"
173        "#
174    }
175
176    #[test]
177    fn validate_rejects_empty_pipeline_name() {
178        let config = Config::from_toml_str(
179            r#"
180            [[pipelines]]
181            name = "  "
182            [pipelines.source]
183            type = "noop"
184            [[pipelines.sinks]]
185            type = "noop"
186            "#,
187        )
188        .unwrap();
189
190        let msg = format!("{:#}", config.validate().unwrap_err());
191        assert!(msg.contains("pipelines[0].name"), "{msg}");
192        assert!(msg.contains("must not be empty"), "{msg}");
193    }
194
195    #[test]
196    fn validate_rejects_duplicate_pipeline_names_in_single_config() {
197        let config = Config::from_toml_str(
198            r#"
199            [[pipelines]]
200            name = "dup"
201            [pipelines.source]
202            type = "noop"
203            [[pipelines.sinks]]
204            type = "noop"
205
206            [[pipelines]]
207            name = "dup"
208            [pipelines.source]
209            type = "noop"
210            [[pipelines.sinks]]
211            type = "noop"
212            "#,
213        )
214        .unwrap();
215
216        let msg = format!("{:#}", config.validate().unwrap_err());
217        assert!(msg.contains("pipelines[1].name"), "{msg}");
218        assert!(msg.contains("duplicate pipeline name 'dup'"), "{msg}");
219    }
220
221    #[test]
222    fn validate_rejects_zero_channel_capacity() {
223        let config = Config::from_toml_str(
224            r#"
225            [[pipelines]]
226            name = "p"
227            channel_capacity = 0
228            [pipelines.source]
229            type = "noop"
230            [[pipelines.sinks]]
231            type = "noop"
232            "#,
233        )
234        .unwrap();
235
236        let msg = format!("{:#}", config.validate().unwrap_err());
237        assert!(msg.contains("pipeline 'p'.channel_capacity"), "{msg}");
238        assert!(msg.contains("greater than 0"), "{msg}");
239    }
240
241    #[test]
242    fn validate_rejects_missing_sinks() {
243        let config = Config::from_toml_str(
244            r#"
245            [[pipelines]]
246            name = "p"
247            [pipelines.source]
248            type = "noop"
249            "#,
250        )
251        .unwrap();
252
253        let msg = format!("{:#}", config.validate().unwrap_err());
254        assert!(msg.contains("pipeline 'p'.sinks"), "{msg}");
255        assert!(msg.contains("at least one sink is required"), "{msg}");
256    }
257
258    #[test]
259    fn validate_rejects_invalid_source_retry_policy() {
260        let config = Config::from_toml_str(
261            r#"
262            [[pipelines]]
263            name = "p"
264
265            [pipelines.source]
266            type = "api_poll"
267
268            [pipelines.source.retry]
269            max_attempts = 0
270            initial_delay_ms = 1
271            backoff_multiplier = 1.0
272            max_delay_ms = 1
273
274            [[pipelines.sinks]]
275            type = "noop"
276            "#,
277        )
278        .unwrap();
279
280        let msg = format!("{:#}", config.validate().unwrap_err());
281        assert!(
282            msg.contains("pipeline 'p'.source.retry.max_attempts"),
283            "{msg}"
284        );
285    }
286
287    #[test]
288    fn validate_rejects_source_retry_dead_letter() {
289        let dir = tempfile::tempdir().unwrap();
290        let dlq_path = dir.path().join("source-dlq.jsonl");
291        let config = Config::from_toml_str(&format!(
292            r#"
293            [[pipelines]]
294            name = "p"
295
296            [pipelines.source]
297            type = "api_poll"
298
299            [pipelines.source.retry]
300            max_attempts = 3
301            initial_delay_ms = 1
302            backoff_multiplier = 1.0
303            max_delay_ms = 1
304            on_exhausted = {{ kind = "dead_letter", path = "{}" }}
305
306            [[pipelines.sinks]]
307            type = "noop"
308            "#,
309            dlq_path.display()
310        ))
311        .unwrap();
312
313        let msg = format!("{:#}", config.validate().unwrap_err());
314        assert!(
315            msg.contains("pipeline 'p'.source.retry.on_exhausted"),
316            "{msg}"
317        );
318        assert!(
319            msg.contains("dead_letter is only supported for sink retry policies"),
320            "{msg}"
321        );
322    }
323
324    #[test]
325    fn validate_rejects_invalid_retry_policy() {
326        let config = Config::from_toml_str(
327            r#"
328            [[pipelines]]
329            name = "p"
330            [pipelines.source]
331            type = "noop"
332            [[pipelines.sinks]]
333            type = "noop"
334            [pipelines.sinks.retry]
335            max_attempts = 0
336            initial_delay_ms = 1
337            backoff_multiplier = 1.0
338            max_delay_ms = 1
339            "#,
340        )
341        .unwrap();
342
343        let msg = format!("{:#}", config.validate().unwrap_err());
344        assert!(
345            msg.contains("pipeline 'p'.sinks[0].retry.max_attempts"),
346            "{msg}"
347        );
348    }
349
350    #[test]
351    fn validate_rejects_dead_letter_parent_that_is_not_directory() {
352        let dir = tempfile::tempdir().unwrap();
353        let parent_file = dir.path().join("not-a-dir");
354        std::fs::write(&parent_file, "").unwrap();
355        let dlq_path = parent_file.join("dlq.jsonl");
356        let config = Config::from_toml_str(&format!(
357            r#"
358            [[pipelines]]
359            name = "p"
360            [pipelines.source]
361            type = "noop"
362            [[pipelines.sinks]]
363            type = "noop"
364            [pipelines.sinks.retry]
365            max_attempts = 1
366            initial_delay_ms = 0
367            backoff_multiplier = 1.0
368            max_delay_ms = 0
369            on_exhausted = {{ kind = "dead_letter", path = "{}" }}
370            "#,
371            dlq_path.display()
372        ))
373        .unwrap();
374
375        let msg = format!("{:#}", config.validate().unwrap_err());
376        assert!(
377            msg.contains("pipeline 'p'.sinks[0].retry.on_exhausted.path"),
378            "{msg}"
379        );
380        assert!(msg.contains("is not a directory"), "{msg}");
381    }
382
383    #[test]
384    fn validate_rejects_sample_ratio_above_one() {
385        let config = Config::from_toml_str(&format!(
386            r#"
387            [observability.tracing]
388            sample_ratio = 1.5
389            {}"#,
390            minimal_pipeline_block()
391        ))
392        .unwrap();
393        let msg = format!("{:#}", config.validate().unwrap_err());
394        assert!(msg.contains("observability.tracing.sample_ratio"), "{msg}");
395    }
396
397    #[test]
398    fn validate_rejects_negative_sample_ratio() {
399        let config = Config::from_toml_str(&format!(
400            r#"
401            [observability.tracing]
402            sample_ratio = -0.1
403            {}"#,
404            minimal_pipeline_block()
405        ))
406        .unwrap();
407        config.validate().unwrap_err();
408    }
409
410    #[test]
411    fn validate_rejects_empty_service_name() {
412        let config = Config::from_toml_str(&format!(
413            r#"
414            [observability]
415            service_name = ""
416            {}"#,
417            minimal_pipeline_block()
418        ))
419        .unwrap();
420        let msg = format!("{:#}", config.validate().unwrap_err());
421        assert!(msg.contains("observability.service_name"), "{msg}");
422    }
423
424    #[test]
425    fn validate_rejects_zero_export_interval() {
426        let config = Config::from_toml_str(&format!(
427            r#"
428            [observability.metrics]
429            export_interval_ms = 0
430            {}"#,
431            minimal_pipeline_block()
432        ))
433        .unwrap();
434        let msg = format!("{:#}", config.validate().unwrap_err());
435        assert!(
436            msg.contains("observability.metrics.export_interval_ms"),
437            "{msg}"
438        );
439    }
440
441    #[test]
442    fn validate_rejects_empty_log_level() {
443        let config = Config::from_toml_str(&format!(
444            r#"
445            [observability]
446            log_level = "   "
447            {}"#,
448            minimal_pipeline_block()
449        ))
450        .unwrap();
451        let msg = format!("{:#}", config.validate().unwrap_err());
452        assert!(msg.contains("observability.log_level"), "{msg}");
453    }
454
455    #[test]
456    fn validate_rejects_invalid_log_level() {
457        let config = Config::from_toml_str(&format!(
458            r#"
459            [observability]
460            log_level = "courier==debug"
461            {}"#,
462            minimal_pipeline_block()
463        ))
464        .unwrap();
465        let msg = format!("{:#}", config.validate().unwrap_err());
466        assert!(msg.contains("observability.log_level"), "{msg}");
467        assert!(msg.contains("invalid log filter directive"), "{msg}");
468    }
469}