courier/config/
validate.rs1use std::collections::HashMap;
2
3use anyhow::{Context, Result, bail};
4use tracing_subscriber::EnvFilter;
5
6use crate::retry::RetryPolicy;
7
8use super::observability::ObservabilityConfig;
9use super::redact::{redact_secret, redact_secret_path};
10use super::types::Config;
11
12impl Config {
13 pub fn validate(&self) -> Result<()> {
14 if let Some(obs) = &self.observability {
15 validate_observability(obs)?;
16 }
17
18 let mut seen_names: HashMap<&str, usize> = HashMap::new();
19
20 for (pipeline_index, pipeline) in self.pipelines.iter().enumerate() {
21 let pipeline_path = format!("pipelines[{pipeline_index}]");
22 let pipeline_label = pipeline_label(pipeline_index, &pipeline.name);
23
24 if pipeline.name.trim().is_empty() {
25 bail!("{pipeline_path}.name: pipeline name must not be empty");
26 }
27
28 if let Some(previous_index) = seen_names.insert(pipeline.name.as_str(), pipeline_index)
29 {
30 bail!(
31 "{pipeline_path}.name: duplicate pipeline name '{}' (already defined at pipelines[{previous_index}].name)",
32 redact_secret(&pipeline.name)
33 );
34 }
35
36 if pipeline.source.kind.trim().is_empty() {
37 bail!("{pipeline_label}.source.type: source type must not be empty");
38 }
39
40 if let Some(retry) = &pipeline.source.retry {
41 validate_retry_policy(retry, &format!("{pipeline_label}.source.retry"), false)?;
42 }
43
44 if matches!(pipeline.channel_capacity, Some(0)) {
45 bail!("{pipeline_label}.channel_capacity: must be greater than 0");
46 }
47
48 if pipeline.sinks.is_empty() {
49 bail!("{pipeline_label}.sinks: at least one sink is required");
50 }
51
52 for (transform_index, transform) in pipeline.transforms.iter().enumerate() {
53 if transform.kind.trim().is_empty() {
54 bail!(
55 "{pipeline_label}.transforms[{transform_index}].type: transform type must not be empty"
56 );
57 }
58 }
59
60 for (sink_index, sink) in pipeline.sinks.iter().enumerate() {
61 if sink.kind.trim().is_empty() {
62 bail!("{pipeline_label}.sinks[{sink_index}].type: sink type must not be empty");
63 }
64 if let Some(retry) = &sink.retry {
65 validate_retry_policy(
66 retry,
67 &format!("{pipeline_label}.sinks[{sink_index}].retry"),
68 true,
69 )?;
70 }
71 }
72 }
73
74 Ok(())
75 }
76}
77
78fn pipeline_label(index: usize, name: &str) -> String {
79 if name.trim().is_empty() {
80 format!("pipelines[{index}]")
81 } else {
82 format!("pipeline '{}'", redact_secret(name))
83 }
84}
85
86fn validate_observability(obs: &ObservabilityConfig) -> Result<()> {
87 if !(0.0..=1.0).contains(&obs.tracing.sample_ratio) || !obs.tracing.sample_ratio.is_finite() {
88 bail!(
89 "observability.tracing.sample_ratio: must be a finite value in [0.0, 1.0] (got {})",
90 obs.tracing.sample_ratio
91 );
92 }
93 if obs.service_name.trim().is_empty() {
94 bail!("observability.service_name: must not be empty");
95 }
96 if obs.metrics.export_interval_ms == 0 {
97 bail!("observability.metrics.export_interval_ms: must be greater than 0");
98 }
99 if let Some(level) = &obs.log_level {
100 if level.trim().is_empty() {
101 bail!("observability.log_level: must not be empty when set");
102 }
103 EnvFilter::try_new(level)
104 .with_context(|| "observability.log_level: invalid log filter directive")?;
105 }
106 Ok(())
107}
108
109fn validate_retry_policy(policy: &RetryPolicy, path: &str, allow_dead_letter: bool) -> Result<()> {
110 if policy.max_attempts == 0 {
111 bail!("{path}.max_attempts: must be greater than or equal to 1");
112 }
113
114 if policy.max_attempts > 1 {
115 if policy.initial_delay_ms == 0 {
116 bail!("{path}.initial_delay_ms: must be greater than 0 when max_attempts > 1");
117 }
118 if policy.max_delay_ms == 0 {
119 bail!("{path}.max_delay_ms: must be greater than 0 when max_attempts > 1");
120 }
121 }
122
123 if !policy.backoff_multiplier.is_finite() || policy.backoff_multiplier < 1.0 {
124 bail!("{path}.backoff_multiplier: must be finite and greater than or equal to 1.0");
125 }
126
127 if policy.max_delay_ms < policy.initial_delay_ms {
128 bail!("{path}.max_delay_ms: must be greater than or equal to initial_delay_ms");
129 }
130
131 if let crate::retry::ExhaustedPolicy::DeadLetter { path: dlq_path } = &policy.on_exhausted {
132 if !allow_dead_letter {
133 bail!("{path}.on_exhausted: dead_letter is only supported for sink retry policies");
134 }
135
136 if dlq_path.as_os_str().is_empty() {
137 bail!("{path}.on_exhausted.path: dead-letter path must not be empty");
138 }
139
140 if let Some(parent) = dlq_path.parent()
141 && !parent.as_os_str().is_empty()
142 {
143 if !parent.exists() {
144 bail!(
145 "{path}.on_exhausted.path: parent directory '{}' does not exist",
146 redact_secret_path(parent)
147 );
148 }
149 if !parent.is_dir() {
150 bail!(
151 "{path}.on_exhausted.path: parent '{}' is not a directory",
152 redact_secret_path(parent)
153 );
154 }
155 }
156 }
157
158 Ok(())
159}
160
161#[cfg(test)]
162mod tests {
163 use crate::config::Config;
164
165 fn minimal_pipeline_block() -> &'static str {
166 r#"
167 [[pipelines]]
168 name = "p"
169 [pipelines.source]
170 type = "noop"
171 [[pipelines.sinks]]
172 type = "noop"
173 "#
174 }
175
176 #[test]
177 fn validate_rejects_empty_pipeline_name() {
178 let config = Config::from_toml_str(
179 r#"
180 [[pipelines]]
181 name = " "
182 [pipelines.source]
183 type = "noop"
184 [[pipelines.sinks]]
185 type = "noop"
186 "#,
187 )
188 .unwrap();
189
190 let msg = format!("{:#}", config.validate().unwrap_err());
191 assert!(msg.contains("pipelines[0].name"), "{msg}");
192 assert!(msg.contains("must not be empty"), "{msg}");
193 }
194
195 #[test]
196 fn validate_rejects_duplicate_pipeline_names_in_single_config() {
197 let config = Config::from_toml_str(
198 r#"
199 [[pipelines]]
200 name = "dup"
201 [pipelines.source]
202 type = "noop"
203 [[pipelines.sinks]]
204 type = "noop"
205
206 [[pipelines]]
207 name = "dup"
208 [pipelines.source]
209 type = "noop"
210 [[pipelines.sinks]]
211 type = "noop"
212 "#,
213 )
214 .unwrap();
215
216 let msg = format!("{:#}", config.validate().unwrap_err());
217 assert!(msg.contains("pipelines[1].name"), "{msg}");
218 assert!(msg.contains("duplicate pipeline name 'dup'"), "{msg}");
219 }
220
221 #[test]
222 fn validate_rejects_zero_channel_capacity() {
223 let config = Config::from_toml_str(
224 r#"
225 [[pipelines]]
226 name = "p"
227 channel_capacity = 0
228 [pipelines.source]
229 type = "noop"
230 [[pipelines.sinks]]
231 type = "noop"
232 "#,
233 )
234 .unwrap();
235
236 let msg = format!("{:#}", config.validate().unwrap_err());
237 assert!(msg.contains("pipeline 'p'.channel_capacity"), "{msg}");
238 assert!(msg.contains("greater than 0"), "{msg}");
239 }
240
241 #[test]
242 fn validate_rejects_missing_sinks() {
243 let config = Config::from_toml_str(
244 r#"
245 [[pipelines]]
246 name = "p"
247 [pipelines.source]
248 type = "noop"
249 "#,
250 )
251 .unwrap();
252
253 let msg = format!("{:#}", config.validate().unwrap_err());
254 assert!(msg.contains("pipeline 'p'.sinks"), "{msg}");
255 assert!(msg.contains("at least one sink is required"), "{msg}");
256 }
257
258 #[test]
259 fn validate_rejects_invalid_source_retry_policy() {
260 let config = Config::from_toml_str(
261 r#"
262 [[pipelines]]
263 name = "p"
264
265 [pipelines.source]
266 type = "api_poll"
267
268 [pipelines.source.retry]
269 max_attempts = 0
270 initial_delay_ms = 1
271 backoff_multiplier = 1.0
272 max_delay_ms = 1
273
274 [[pipelines.sinks]]
275 type = "noop"
276 "#,
277 )
278 .unwrap();
279
280 let msg = format!("{:#}", config.validate().unwrap_err());
281 assert!(
282 msg.contains("pipeline 'p'.source.retry.max_attempts"),
283 "{msg}"
284 );
285 }
286
287 #[test]
288 fn validate_rejects_source_retry_dead_letter() {
289 let dir = tempfile::tempdir().unwrap();
290 let dlq_path = dir.path().join("source-dlq.jsonl");
291 let config = Config::from_toml_str(&format!(
292 r#"
293 [[pipelines]]
294 name = "p"
295
296 [pipelines.source]
297 type = "api_poll"
298
299 [pipelines.source.retry]
300 max_attempts = 3
301 initial_delay_ms = 1
302 backoff_multiplier = 1.0
303 max_delay_ms = 1
304 on_exhausted = {{ kind = "dead_letter", path = "{}" }}
305
306 [[pipelines.sinks]]
307 type = "noop"
308 "#,
309 dlq_path.display()
310 ))
311 .unwrap();
312
313 let msg = format!("{:#}", config.validate().unwrap_err());
314 assert!(
315 msg.contains("pipeline 'p'.source.retry.on_exhausted"),
316 "{msg}"
317 );
318 assert!(
319 msg.contains("dead_letter is only supported for sink retry policies"),
320 "{msg}"
321 );
322 }
323
324 #[test]
325 fn validate_rejects_invalid_retry_policy() {
326 let config = Config::from_toml_str(
327 r#"
328 [[pipelines]]
329 name = "p"
330 [pipelines.source]
331 type = "noop"
332 [[pipelines.sinks]]
333 type = "noop"
334 [pipelines.sinks.retry]
335 max_attempts = 0
336 initial_delay_ms = 1
337 backoff_multiplier = 1.0
338 max_delay_ms = 1
339 "#,
340 )
341 .unwrap();
342
343 let msg = format!("{:#}", config.validate().unwrap_err());
344 assert!(
345 msg.contains("pipeline 'p'.sinks[0].retry.max_attempts"),
346 "{msg}"
347 );
348 }
349
350 #[test]
351 fn validate_rejects_dead_letter_parent_that_is_not_directory() {
352 let dir = tempfile::tempdir().unwrap();
353 let parent_file = dir.path().join("not-a-dir");
354 std::fs::write(&parent_file, "").unwrap();
355 let dlq_path = parent_file.join("dlq.jsonl");
356 let config = Config::from_toml_str(&format!(
357 r#"
358 [[pipelines]]
359 name = "p"
360 [pipelines.source]
361 type = "noop"
362 [[pipelines.sinks]]
363 type = "noop"
364 [pipelines.sinks.retry]
365 max_attempts = 1
366 initial_delay_ms = 0
367 backoff_multiplier = 1.0
368 max_delay_ms = 0
369 on_exhausted = {{ kind = "dead_letter", path = "{}" }}
370 "#,
371 dlq_path.display()
372 ))
373 .unwrap();
374
375 let msg = format!("{:#}", config.validate().unwrap_err());
376 assert!(
377 msg.contains("pipeline 'p'.sinks[0].retry.on_exhausted.path"),
378 "{msg}"
379 );
380 assert!(msg.contains("is not a directory"), "{msg}");
381 }
382
383 #[test]
384 fn validate_rejects_sample_ratio_above_one() {
385 let config = Config::from_toml_str(&format!(
386 r#"
387 [observability.tracing]
388 sample_ratio = 1.5
389 {}"#,
390 minimal_pipeline_block()
391 ))
392 .unwrap();
393 let msg = format!("{:#}", config.validate().unwrap_err());
394 assert!(msg.contains("observability.tracing.sample_ratio"), "{msg}");
395 }
396
397 #[test]
398 fn validate_rejects_negative_sample_ratio() {
399 let config = Config::from_toml_str(&format!(
400 r#"
401 [observability.tracing]
402 sample_ratio = -0.1
403 {}"#,
404 minimal_pipeline_block()
405 ))
406 .unwrap();
407 config.validate().unwrap_err();
408 }
409
410 #[test]
411 fn validate_rejects_empty_service_name() {
412 let config = Config::from_toml_str(&format!(
413 r#"
414 [observability]
415 service_name = ""
416 {}"#,
417 minimal_pipeline_block()
418 ))
419 .unwrap();
420 let msg = format!("{:#}", config.validate().unwrap_err());
421 assert!(msg.contains("observability.service_name"), "{msg}");
422 }
423
424 #[test]
425 fn validate_rejects_zero_export_interval() {
426 let config = Config::from_toml_str(&format!(
427 r#"
428 [observability.metrics]
429 export_interval_ms = 0
430 {}"#,
431 minimal_pipeline_block()
432 ))
433 .unwrap();
434 let msg = format!("{:#}", config.validate().unwrap_err());
435 assert!(
436 msg.contains("observability.metrics.export_interval_ms"),
437 "{msg}"
438 );
439 }
440
441 #[test]
442 fn validate_rejects_empty_log_level() {
443 let config = Config::from_toml_str(&format!(
444 r#"
445 [observability]
446 log_level = " "
447 {}"#,
448 minimal_pipeline_block()
449 ))
450 .unwrap();
451 let msg = format!("{:#}", config.validate().unwrap_err());
452 assert!(msg.contains("observability.log_level"), "{msg}");
453 }
454
455 #[test]
456 fn validate_rejects_invalid_log_level() {
457 let config = Config::from_toml_str(&format!(
458 r#"
459 [observability]
460 log_level = "courier==debug"
461 {}"#,
462 minimal_pipeline_block()
463 ))
464 .unwrap();
465 let msg = format!("{:#}", config.validate().unwrap_err());
466 assert!(msg.contains("observability.log_level"), "{msg}");
467 assert!(msg.contains("invalid log filter directive"), "{msg}");
468 }
469}