1use faucet_common_kafka::{KafkaAuth, KafkaValueFormat, OnDecodeError};
4use faucet_core::{DEFAULT_BATCH_SIZE, FaucetError};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::time::Duration;
9
10#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
11pub struct KafkaSourceConfig {
12 pub brokers: String,
14 pub topics: Vec<String>,
16 pub group_id: String,
18 #[serde(default)]
19 pub auth: KafkaAuth,
20 #[serde(default)]
21 pub value_format: KafkaValueFormat,
22 #[serde(default, skip_serializing_if = "Option::is_none")]
25 pub key_format: Option<KafkaValueFormat>,
26 #[serde(default)]
27 pub auto_offset_reset: OffsetReset,
28 #[serde(default, skip_serializing_if = "Option::is_none")]
31 pub max_messages: Option<usize>,
32 #[serde(
34 default,
35 skip_serializing_if = "Option::is_none",
36 with = "faucet_core::config::duration_secs_option"
37 )]
38 #[schemars(with = "Option<u64>")]
39 pub idle_timeout: Option<Duration>,
40 #[serde(
42 default = "default_poll_timeout",
43 with = "faucet_core::config::duration_secs"
44 )]
45 #[schemars(with = "u64")]
46 pub poll_timeout: Duration,
47 #[serde(
49 default = "default_session_timeout",
50 with = "faucet_core::config::duration_secs"
51 )]
52 #[schemars(with = "u64")]
53 pub session_timeout: Duration,
54 #[serde(default)]
55 pub on_decode_error: OnDecodeError,
56 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
59 pub extra_client_config: BTreeMap<String, String>,
60 #[serde(default = "default_batch_size")]
73 pub batch_size: usize,
74}
75
76fn default_poll_timeout() -> Duration {
77 Duration::from_secs(1)
78}
79
80fn default_session_timeout() -> Duration {
81 Duration::from_secs(30)
82}
83
84fn default_batch_size() -> usize {
85 DEFAULT_BATCH_SIZE
86}
87
88#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
89#[serde(rename_all = "snake_case")]
90pub enum OffsetReset {
91 Earliest,
92 #[default]
93 Latest,
94}
95
96impl OffsetReset {
97 #[allow(dead_code)] pub(crate) fn as_str(&self) -> &'static str {
99 match self {
100 OffsetReset::Earliest => "earliest",
101 OffsetReset::Latest => "latest",
102 }
103 }
104}
105
106impl KafkaSourceConfig {
107 pub fn validate(&self) -> Result<(), FaucetError> {
109 if self.brokers.trim().is_empty() {
110 return Err(FaucetError::Config(
111 "kafka source: brokers must not be empty".into(),
112 ));
113 }
114 if self.topics.is_empty() {
115 return Err(FaucetError::Config(
116 "kafka source: topics must contain at least one entry".into(),
117 ));
118 }
119 if self.group_id.trim().is_empty() {
120 return Err(FaucetError::Config(
121 "kafka source: group_id must not be empty".into(),
122 ));
123 }
124 if self.max_messages.is_none() && self.idle_timeout.is_none() {
125 return Err(FaucetError::Config(
126 "kafka source: at least one of max_messages or idle_timeout must be set".into(),
127 ));
128 }
129 faucet_core::validate_batch_size(self.batch_size)?;
130 Ok(())
131 }
132
133 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
142 self.batch_size = batch_size;
143 self
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use serde_json::json;
151
152 fn minimal_config() -> KafkaSourceConfig {
153 KafkaSourceConfig {
154 brokers: "localhost:9092".into(),
155 topics: vec!["orders".into()],
156 group_id: "test-group".into(),
157 auth: KafkaAuth::None,
158 value_format: KafkaValueFormat::Json,
159 key_format: None,
160 auto_offset_reset: OffsetReset::Latest,
161 max_messages: Some(10),
162 idle_timeout: None,
163 poll_timeout: Duration::from_secs(1),
164 session_timeout: Duration::from_secs(30),
165 on_decode_error: OnDecodeError::Fail,
166 extra_client_config: BTreeMap::new(),
167 batch_size: DEFAULT_BATCH_SIZE,
168 }
169 }
170
171 #[test]
172 fn validate_accepts_minimal_config() {
173 assert!(minimal_config().validate().is_ok());
174 }
175
176 #[test]
177 fn validate_rejects_empty_brokers() {
178 let mut c = minimal_config();
179 c.brokers = "".into();
180 assert!(c.validate().is_err());
181 }
182
183 #[test]
184 fn validate_rejects_empty_topics() {
185 let mut c = minimal_config();
186 c.topics = vec![];
187 assert!(c.validate().is_err());
188 }
189
190 #[test]
191 fn validate_rejects_empty_group_id() {
192 let mut c = minimal_config();
193 c.group_id = " ".into();
194 assert!(c.validate().is_err());
195 }
196
197 #[test]
198 fn validate_rejects_no_termination_condition() {
199 let mut c = minimal_config();
200 c.max_messages = None;
201 c.idle_timeout = None;
202 let err = c.validate().unwrap_err();
203 assert!(format!("{err}").contains("max_messages or idle_timeout"));
204 }
205
206 #[test]
207 fn validate_accepts_idle_timeout_only() {
208 let mut c = minimal_config();
209 c.max_messages = None;
210 c.idle_timeout = Some(Duration::from_secs(5));
211 assert!(c.validate().is_ok());
212 }
213
214 #[test]
215 fn deserialize_from_yaml_like_json() {
216 let j = json!({
217 "brokers": "broker:9092",
218 "topics": ["t1"],
219 "group_id": "g",
220 "value_format": {"type": "json"},
221 "max_messages": 100
222 });
223 let parsed: KafkaSourceConfig = serde_json::from_value(j).unwrap();
224 assert_eq!(parsed.topics, vec!["t1"]);
225 assert_eq!(parsed.max_messages, Some(100));
226 assert!(matches!(parsed.auth, KafkaAuth::None));
227 assert_eq!(parsed.auto_offset_reset, OffsetReset::Latest);
228 }
229
230 #[test]
231 fn schema_for_config_compiles() {
232 let _ = schemars::schema_for!(KafkaSourceConfig);
233 }
234
235 #[test]
236 fn offset_reset_as_str() {
237 assert_eq!(OffsetReset::Earliest.as_str(), "earliest");
238 assert_eq!(OffsetReset::Latest.as_str(), "latest");
239 }
240
241 #[test]
242 fn batch_size_defaults_to_default_batch_size() {
243 let j = json!({
244 "brokers": "broker:9092",
245 "topics": ["t1"],
246 "group_id": "g",
247 "max_messages": 100,
248 });
249 let parsed: KafkaSourceConfig = serde_json::from_value(j).unwrap();
250 assert_eq!(parsed.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
251 }
252
253 #[test]
254 fn with_batch_size_overrides_default() {
255 let config = minimal_config().with_batch_size(500);
256 assert_eq!(config.batch_size, 500);
257 }
258
259 #[test]
260 fn batch_size_zero_is_accepted_as_drain_window_sentinel() {
261 let config = minimal_config().with_batch_size(0);
262 assert_eq!(config.batch_size, 0);
263 assert!(config.validate().is_ok());
264 }
265
266 #[test]
267 fn validate_rejects_batch_size_above_max() {
268 let config = minimal_config().with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
269 let err = config.validate().unwrap_err();
270 assert!(matches!(err, FaucetError::Config(_)));
271 }
272
273 #[test]
274 fn batch_size_deserializes_from_json() {
275 let j = json!({
276 "brokers": "broker:9092",
277 "topics": ["t1"],
278 "group_id": "g",
279 "max_messages": 100,
280 "batch_size": 250,
281 });
282 let parsed: KafkaSourceConfig = serde_json::from_value(j).unwrap();
283 assert_eq!(parsed.batch_size, 250);
284 }
285}