1use std::fmt;
2
3use serde_json::Value;
4
5use crate::pipeline::ErrorPolicy;
6use crate::retry::RetryPolicy;
7
8use super::redact::{RedactedJsonValue, RedactedOptionRetryPolicy, RedactedStr};
9
10#[derive(Clone, Default, PartialEq)]
11pub struct Config {
12 pub pipelines: Vec<PipelineSpec>,
13 pub observability: Option<super::observability::ObservabilityConfig>,
14}
15
16impl fmt::Debug for Config {
17 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
18 f.debug_struct("Config")
19 .field("pipelines", &self.pipelines)
20 .field("observability", &self.observability)
21 .finish()
22 }
23}
24
25#[derive(Clone, PartialEq)]
26pub struct PipelineSpec {
27 pub name: String,
28 pub source: SourceSpec,
29 pub transforms: Vec<TransformSpec>,
30 pub sinks: Vec<SinkSpec>,
31 pub channel_capacity: Option<usize>,
32}
33
34#[derive(Clone, PartialEq)]
35pub struct SourceSpec {
36 pub kind: String,
37 pub config: Value,
38 pub retry: Option<RetryPolicy>,
39}
40
41#[derive(Clone, PartialEq)]
42pub struct TransformSpec {
43 pub kind: String,
44 pub config: Value,
45 pub on_error: Option<ErrorPolicyConfig>,
46}
47
48#[derive(Clone, PartialEq)]
49pub struct SinkSpec {
50 pub kind: String,
51 pub config: Value,
52 pub on_error: Option<ErrorPolicyConfig>,
53 pub retry: Option<RetryPolicy>,
54}
55
56impl fmt::Debug for PipelineSpec {
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 f.debug_struct("PipelineSpec")
59 .field("name", &RedactedStr(&self.name))
60 .field("source", &self.source)
61 .field("transforms", &self.transforms)
62 .field("sinks", &self.sinks)
63 .field("channel_capacity", &self.channel_capacity)
64 .finish()
65 }
66}
67
68impl fmt::Debug for SourceSpec {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 f.debug_struct("SourceSpec")
71 .field("kind", &RedactedStr(&self.kind))
72 .field("config", &RedactedJsonValue(&self.config))
73 .field("retry", &RedactedOptionRetryPolicy(self.retry.as_ref()))
74 .finish()
75 }
76}
77
78impl fmt::Debug for TransformSpec {
79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80 f.debug_struct("TransformSpec")
81 .field("kind", &RedactedStr(&self.kind))
82 .field("config", &RedactedJsonValue(&self.config))
83 .field("on_error", &self.on_error)
84 .finish()
85 }
86}
87
88impl fmt::Debug for SinkSpec {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 f.debug_struct("SinkSpec")
91 .field("kind", &RedactedStr(&self.kind))
92 .field("config", &RedactedJsonValue(&self.config))
93 .field("on_error", &self.on_error)
94 .field("retry", &RedactedOptionRetryPolicy(self.retry.as_ref()))
95 .finish()
96 }
97}
98
99#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
100pub enum ErrorPolicyConfig {
101 #[default]
102 Drop,
103 FailPipeline,
104}
105
106impl From<ErrorPolicyConfig> for ErrorPolicy {
107 fn from(value: ErrorPolicyConfig) -> Self {
108 match value {
109 ErrorPolicyConfig::Drop => ErrorPolicy::Drop,
110 ErrorPolicyConfig::FailPipeline => ErrorPolicy::FailPipeline,
111 }
112 }
113}