datadog_api_client/datadogV2/model/
model_observability_pipeline_kafka_source.rs

1// Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2.0 License.
2// This product includes software developed at Datadog (https://www.datadoghq.com/).
3// Copyright 2019-Present Datadog, Inc.
4use serde::de::{Error, MapAccess, Visitor};
5use serde::{Deserialize, Deserializer, Serialize};
6use serde_with::skip_serializing_none;
7use std::fmt::{self, Formatter};
8
9/// The `kafka` source ingests data from Apache Kafka topics.
10#[non_exhaustive]
11#[skip_serializing_none]
12#[derive(Clone, Debug, PartialEq, Serialize)]
13pub struct ObservabilityPipelineKafkaSource {
14    /// Consumer group ID used by the Kafka client.
15    #[serde(rename = "group_id")]
16    pub group_id: String,
17    /// The unique identifier for this component. Used to reference this component in other parts of the pipeline (e.g., as input to downstream components).
18    #[serde(rename = "id")]
19    pub id: String,
20    /// Optional list of advanced Kafka client configuration options, defined as key-value pairs.
21    #[serde(rename = "librdkafka_options")]
22    pub librdkafka_options:
23        Option<Vec<crate::datadogV2::model::ObservabilityPipelineKafkaSourceLibrdkafkaOption>>,
24    /// Specifies the SASL mechanism for authenticating with a Kafka cluster.
25    #[serde(rename = "sasl")]
26    pub sasl: Option<crate::datadogV2::model::ObservabilityPipelineKafkaSourceSasl>,
27    /// Configuration for enabling TLS encryption between the pipeline component and external services.
28    #[serde(rename = "tls")]
29    pub tls: Option<crate::datadogV2::model::ObservabilityPipelineTls>,
30    /// A list of Kafka topic names to subscribe to. The source ingests messages from each topic specified.
31    #[serde(rename = "topics")]
32    pub topics: Vec<String>,
33    /// The source type. The value should always be `kafka`.
34    #[serde(rename = "type")]
35    pub type_: crate::datadogV2::model::ObservabilityPipelineKafkaSourceType,
36    #[serde(flatten)]
37    pub additional_properties: std::collections::BTreeMap<String, serde_json::Value>,
38    #[serde(skip)]
39    #[serde(default)]
40    pub(crate) _unparsed: bool,
41}
42
43impl ObservabilityPipelineKafkaSource {
44    pub fn new(
45        group_id: String,
46        id: String,
47        topics: Vec<String>,
48        type_: crate::datadogV2::model::ObservabilityPipelineKafkaSourceType,
49    ) -> ObservabilityPipelineKafkaSource {
50        ObservabilityPipelineKafkaSource {
51            group_id,
52            id,
53            librdkafka_options: None,
54            sasl: None,
55            tls: None,
56            topics,
57            type_,
58            additional_properties: std::collections::BTreeMap::new(),
59            _unparsed: false,
60        }
61    }
62
63    pub fn librdkafka_options(
64        mut self,
65        value: Vec<crate::datadogV2::model::ObservabilityPipelineKafkaSourceLibrdkafkaOption>,
66    ) -> Self {
67        self.librdkafka_options = Some(value);
68        self
69    }
70
71    pub fn sasl(
72        mut self,
73        value: crate::datadogV2::model::ObservabilityPipelineKafkaSourceSasl,
74    ) -> Self {
75        self.sasl = Some(value);
76        self
77    }
78
79    pub fn tls(mut self, value: crate::datadogV2::model::ObservabilityPipelineTls) -> Self {
80        self.tls = Some(value);
81        self
82    }
83
84    pub fn additional_properties(
85        mut self,
86        value: std::collections::BTreeMap<String, serde_json::Value>,
87    ) -> Self {
88        self.additional_properties = value;
89        self
90    }
91}
92
93impl<'de> Deserialize<'de> for ObservabilityPipelineKafkaSource {
94    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
95    where
96        D: Deserializer<'de>,
97    {
98        struct ObservabilityPipelineKafkaSourceVisitor;
99        impl<'a> Visitor<'a> for ObservabilityPipelineKafkaSourceVisitor {
100            type Value = ObservabilityPipelineKafkaSource;
101
102            fn expecting(&self, f: &mut Formatter<'_>) -> fmt::Result {
103                f.write_str("a mapping")
104            }
105
106            fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
107            where
108                M: MapAccess<'a>,
109            {
110                let mut group_id: Option<String> = None;
111                let mut id: Option<String> = None;
112                let mut librdkafka_options: Option<
113                    Vec<crate::datadogV2::model::ObservabilityPipelineKafkaSourceLibrdkafkaOption>,
114                > = None;
115                let mut sasl: Option<
116                    crate::datadogV2::model::ObservabilityPipelineKafkaSourceSasl,
117                > = None;
118                let mut tls: Option<crate::datadogV2::model::ObservabilityPipelineTls> = None;
119                let mut topics: Option<Vec<String>> = None;
120                let mut type_: Option<
121                    crate::datadogV2::model::ObservabilityPipelineKafkaSourceType,
122                > = None;
123                let mut additional_properties: std::collections::BTreeMap<
124                    String,
125                    serde_json::Value,
126                > = std::collections::BTreeMap::new();
127                let mut _unparsed = false;
128
129                while let Some((k, v)) = map.next_entry::<String, serde_json::Value>()? {
130                    match k.as_str() {
131                        "group_id" => {
132                            group_id = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
133                        }
134                        "id" => {
135                            id = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
136                        }
137                        "librdkafka_options" => {
138                            if v.is_null() {
139                                continue;
140                            }
141                            librdkafka_options =
142                                Some(serde_json::from_value(v).map_err(M::Error::custom)?);
143                        }
144                        "sasl" => {
145                            if v.is_null() {
146                                continue;
147                            }
148                            sasl = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
149                        }
150                        "tls" => {
151                            if v.is_null() {
152                                continue;
153                            }
154                            tls = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
155                        }
156                        "topics" => {
157                            topics = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
158                        }
159                        "type" => {
160                            type_ = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
161                            if let Some(ref _type_) = type_ {
162                                match _type_ {
163                                    crate::datadogV2::model::ObservabilityPipelineKafkaSourceType::UnparsedObject(_type_) => {
164                                        _unparsed = true;
165                                    },
166                                    _ => {}
167                                }
168                            }
169                        }
170                        &_ => {
171                            if let Ok(value) = serde_json::from_value(v.clone()) {
172                                additional_properties.insert(k, value);
173                            }
174                        }
175                    }
176                }
177                let group_id = group_id.ok_or_else(|| M::Error::missing_field("group_id"))?;
178                let id = id.ok_or_else(|| M::Error::missing_field("id"))?;
179                let topics = topics.ok_or_else(|| M::Error::missing_field("topics"))?;
180                let type_ = type_.ok_or_else(|| M::Error::missing_field("type_"))?;
181
182                let content = ObservabilityPipelineKafkaSource {
183                    group_id,
184                    id,
185                    librdkafka_options,
186                    sasl,
187                    tls,
188                    topics,
189                    type_,
190                    additional_properties,
191                    _unparsed,
192                };
193
194                Ok(content)
195            }
196        }
197
198        deserializer.deserialize_any(ObservabilityPipelineKafkaSourceVisitor)
199    }
200}