Skip to main content

courier/config/
types.rs

1use std::fmt;
2
3use serde_json::Value;
4
5use crate::pipeline::ErrorPolicy;
6use crate::retry::RetryPolicy;
7
8use super::redact::{RedactedJsonValue, RedactedOptionRetryPolicy, RedactedStr};
9
10#[derive(Clone, Default, PartialEq)]
11pub struct Config {
12    pub pipelines: Vec<PipelineSpec>,
13    pub observability: Option<super::observability::ObservabilityConfig>,
14}
15
16impl fmt::Debug for Config {
17    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
18        f.debug_struct("Config")
19            .field("pipelines", &self.pipelines)
20            .field("observability", &self.observability)
21            .finish()
22    }
23}
24
25#[derive(Clone, PartialEq)]
26pub struct PipelineSpec {
27    pub name: String,
28    pub source: SourceSpec,
29    pub transforms: Vec<TransformSpec>,
30    pub sinks: Vec<SinkSpec>,
31    pub channel_capacity: Option<usize>,
32}
33
34#[derive(Clone, PartialEq)]
35pub struct SourceSpec {
36    pub kind: String,
37    pub config: Value,
38    pub retry: Option<RetryPolicy>,
39}
40
41#[derive(Clone, PartialEq)]
42pub struct TransformSpec {
43    pub kind: String,
44    pub config: Value,
45    pub on_error: Option<ErrorPolicyConfig>,
46}
47
48#[derive(Clone, PartialEq)]
49pub struct SinkSpec {
50    pub kind: String,
51    pub config: Value,
52    pub on_error: Option<ErrorPolicyConfig>,
53    pub retry: Option<RetryPolicy>,
54}
55
56impl fmt::Debug for PipelineSpec {
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58        f.debug_struct("PipelineSpec")
59            .field("name", &RedactedStr(&self.name))
60            .field("source", &self.source)
61            .field("transforms", &self.transforms)
62            .field("sinks", &self.sinks)
63            .field("channel_capacity", &self.channel_capacity)
64            .finish()
65    }
66}
67
68impl fmt::Debug for SourceSpec {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        f.debug_struct("SourceSpec")
71            .field("kind", &RedactedStr(&self.kind))
72            .field("config", &RedactedJsonValue(&self.config))
73            .field("retry", &RedactedOptionRetryPolicy(self.retry.as_ref()))
74            .finish()
75    }
76}
77
78impl fmt::Debug for TransformSpec {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        f.debug_struct("TransformSpec")
81            .field("kind", &RedactedStr(&self.kind))
82            .field("config", &RedactedJsonValue(&self.config))
83            .field("on_error", &self.on_error)
84            .finish()
85    }
86}
87
88impl fmt::Debug for SinkSpec {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        f.debug_struct("SinkSpec")
91            .field("kind", &RedactedStr(&self.kind))
92            .field("config", &RedactedJsonValue(&self.config))
93            .field("on_error", &self.on_error)
94            .field("retry", &RedactedOptionRetryPolicy(self.retry.as_ref()))
95            .finish()
96    }
97}
98
99#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
100pub enum ErrorPolicyConfig {
101    #[default]
102    Drop,
103    FailPipeline,
104}
105
106impl From<ErrorPolicyConfig> for ErrorPolicy {
107    fn from(value: ErrorPolicyConfig) -> Self {
108        match value {
109            ErrorPolicyConfig::Drop => ErrorPolicy::Drop,
110            ErrorPolicyConfig::FailPipeline => ErrorPolicy::FailPipeline,
111        }
112    }
113}