Skip to main content

faucet_source_kafka/
config.rs

1//! Configuration types for the Kafka source.
2
3use faucet_common_kafka::{KafkaAuth, KafkaValueFormat, OnDecodeError};
4use faucet_core::{DEFAULT_BATCH_SIZE, FaucetError};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::time::Duration;
9
10#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
11pub struct KafkaSourceConfig {
12    /// Comma-separated bootstrap server list, e.g. `"broker1:9092,broker2:9092"`.
13    pub brokers: String,
14    /// One or more topics to subscribe to.
15    pub topics: Vec<String>,
16    /// Consumer group ID; required by librdkafka and used for partition assignment.
17    pub group_id: String,
18    #[serde(default)]
19    pub auth: KafkaAuth,
20    #[serde(default)]
21    pub value_format: KafkaValueFormat,
22    /// Format for the message key. `None` (the default) decodes key bytes as
23    /// UTF-8 (or `null` if there is no key on the message).
24    #[serde(default, skip_serializing_if = "Option::is_none")]
25    pub key_format: Option<KafkaValueFormat>,
26    #[serde(default)]
27    pub auto_offset_reset: OffsetReset,
28    /// Stop after this many messages have been consumed.
29    /// At least one of `max_messages` and `idle_timeout` must be set.
30    #[serde(default, skip_serializing_if = "Option::is_none")]
31    pub max_messages: Option<usize>,
32    /// Stop after this many seconds of no new messages.
33    #[serde(
34        default,
35        skip_serializing_if = "Option::is_none",
36        with = "faucet_core::config::duration_secs_option"
37    )]
38    #[schemars(with = "Option<u64>")]
39    pub idle_timeout: Option<Duration>,
40    /// Single-poll timeout (advisory). Default 1 second.
41    #[serde(
42        default = "default_poll_timeout",
43        with = "faucet_core::config::duration_secs"
44    )]
45    #[schemars(with = "u64")]
46    pub poll_timeout: Duration,
47    /// Kafka `session.timeout.ms`. Default 30 seconds.
48    #[serde(
49        default = "default_session_timeout",
50        with = "faucet_core::config::duration_secs"
51    )]
52    #[schemars(with = "u64")]
53    pub session_timeout: Duration,
54    #[serde(default)]
55    pub on_decode_error: OnDecodeError,
56    /// Raw librdkafka client properties to pass through. Use with care —
57    /// these can override anything set by `auth` or the typed fields above.
58    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
59    pub extra_client_config: BTreeMap<String, String>,
60    /// Messages per emitted [`StreamPage`](faucet_core::StreamPage). Messages
61    /// drained from the consumer are accumulated into an in-memory buffer and
62    /// yielded whenever the buffer reaches this size or the idle window flushes
63    /// a partially-filled buffer. Defaults to [`DEFAULT_BATCH_SIZE`].
64    ///
65    /// `batch_size = 0` is the "drain-entire-run-window" sentinel: the source
66    /// accumulates **every** message produced by the run (until `max_messages`
67    /// or `idle_timeout` fires) into a single page before yielding. This
68    /// defeats the point of streaming and exists only for tests or one-shot
69    /// drain scenarios; prefer a finite `batch_size` for production pipelines
70    /// so each batch is durably committed via the state store as soon as the
71    /// sink confirms the write.
72    #[serde(default = "default_batch_size")]
73    pub batch_size: usize,
74}
75
76fn default_poll_timeout() -> Duration {
77    Duration::from_secs(1)
78}
79
80fn default_session_timeout() -> Duration {
81    Duration::from_secs(30)
82}
83
84fn default_batch_size() -> usize {
85    DEFAULT_BATCH_SIZE
86}
87
88#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
89#[serde(rename_all = "snake_case")]
90pub enum OffsetReset {
91    Earliest,
92    #[default]
93    Latest,
94}
95
96impl OffsetReset {
97    #[allow(dead_code)] // used by KafkaSource::new in Task 13
98    pub(crate) fn as_str(&self) -> &'static str {
99        match self {
100            OffsetReset::Earliest => "earliest",
101            OffsetReset::Latest => "latest",
102        }
103    }
104}
105
106impl KafkaSourceConfig {
107    /// Validate the config at construction time. Called by `KafkaSource::new`.
108    pub fn validate(&self) -> Result<(), FaucetError> {
109        if self.brokers.trim().is_empty() {
110            return Err(FaucetError::Config(
111                "kafka source: brokers must not be empty".into(),
112            ));
113        }
114        if self.topics.is_empty() {
115            return Err(FaucetError::Config(
116                "kafka source: topics must contain at least one entry".into(),
117            ));
118        }
119        if self.group_id.trim().is_empty() {
120            return Err(FaucetError::Config(
121                "kafka source: group_id must not be empty".into(),
122            ));
123        }
124        if self.max_messages.is_none() && self.idle_timeout.is_none() {
125            return Err(FaucetError::Config(
126                "kafka source: at least one of max_messages or idle_timeout must be set".into(),
127            ));
128        }
129        faucet_core::validate_batch_size(self.batch_size)?;
130        Ok(())
131    }
132
133    /// Set the per-page message count for
134    /// [`Source::stream_pages`](faucet_core::Source::stream_pages).
135    ///
136    /// Pass `0` to drain the entire run window (until `max_messages` or
137    /// `idle_timeout` fires) into a single [`StreamPage`](faucet_core::StreamPage).
138    /// This is only useful for tests or one-shot drain scenarios — it negates
139    /// the streaming benefit of incremental sink writes plus per-page
140    /// bookmark persistence.
141    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
142        self.batch_size = batch_size;
143        self
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use serde_json::json;
151
152    fn minimal_config() -> KafkaSourceConfig {
153        KafkaSourceConfig {
154            brokers: "localhost:9092".into(),
155            topics: vec!["orders".into()],
156            group_id: "test-group".into(),
157            auth: KafkaAuth::None,
158            value_format: KafkaValueFormat::Json,
159            key_format: None,
160            auto_offset_reset: OffsetReset::Latest,
161            max_messages: Some(10),
162            idle_timeout: None,
163            poll_timeout: Duration::from_secs(1),
164            session_timeout: Duration::from_secs(30),
165            on_decode_error: OnDecodeError::Fail,
166            extra_client_config: BTreeMap::new(),
167            batch_size: DEFAULT_BATCH_SIZE,
168        }
169    }
170
171    #[test]
172    fn validate_accepts_minimal_config() {
173        assert!(minimal_config().validate().is_ok());
174    }
175
176    #[test]
177    fn validate_rejects_empty_brokers() {
178        let mut c = minimal_config();
179        c.brokers = "".into();
180        assert!(c.validate().is_err());
181    }
182
183    #[test]
184    fn validate_rejects_empty_topics() {
185        let mut c = minimal_config();
186        c.topics = vec![];
187        assert!(c.validate().is_err());
188    }
189
190    #[test]
191    fn validate_rejects_empty_group_id() {
192        let mut c = minimal_config();
193        c.group_id = "  ".into();
194        assert!(c.validate().is_err());
195    }
196
197    #[test]
198    fn validate_rejects_no_termination_condition() {
199        let mut c = minimal_config();
200        c.max_messages = None;
201        c.idle_timeout = None;
202        let err = c.validate().unwrap_err();
203        assert!(format!("{err}").contains("max_messages or idle_timeout"));
204    }
205
206    #[test]
207    fn validate_accepts_idle_timeout_only() {
208        let mut c = minimal_config();
209        c.max_messages = None;
210        c.idle_timeout = Some(Duration::from_secs(5));
211        assert!(c.validate().is_ok());
212    }
213
214    #[test]
215    fn deserialize_from_yaml_like_json() {
216        let j = json!({
217            "brokers": "broker:9092",
218            "topics": ["t1"],
219            "group_id": "g",
220            "value_format": {"type": "json"},
221            "max_messages": 100
222        });
223        let parsed: KafkaSourceConfig = serde_json::from_value(j).unwrap();
224        assert_eq!(parsed.topics, vec!["t1"]);
225        assert_eq!(parsed.max_messages, Some(100));
226        assert!(matches!(parsed.auth, KafkaAuth::None));
227        assert_eq!(parsed.auto_offset_reset, OffsetReset::Latest);
228    }
229
230    #[test]
231    fn schema_for_config_compiles() {
232        let _ = schemars::schema_for!(KafkaSourceConfig);
233    }
234
235    #[test]
236    fn offset_reset_as_str() {
237        assert_eq!(OffsetReset::Earliest.as_str(), "earliest");
238        assert_eq!(OffsetReset::Latest.as_str(), "latest");
239    }
240
241    #[test]
242    fn batch_size_defaults_to_default_batch_size() {
243        let j = json!({
244            "brokers": "broker:9092",
245            "topics": ["t1"],
246            "group_id": "g",
247            "max_messages": 100,
248        });
249        let parsed: KafkaSourceConfig = serde_json::from_value(j).unwrap();
250        assert_eq!(parsed.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
251    }
252
253    #[test]
254    fn with_batch_size_overrides_default() {
255        let config = minimal_config().with_batch_size(500);
256        assert_eq!(config.batch_size, 500);
257    }
258
259    #[test]
260    fn batch_size_zero_is_accepted_as_drain_window_sentinel() {
261        let config = minimal_config().with_batch_size(0);
262        assert_eq!(config.batch_size, 0);
263        assert!(config.validate().is_ok());
264    }
265
266    #[test]
267    fn validate_rejects_batch_size_above_max() {
268        let config = minimal_config().with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
269        let err = config.validate().unwrap_err();
270        assert!(matches!(err, FaucetError::Config(_)));
271    }
272
273    #[test]
274    fn batch_size_deserializes_from_json() {
275        let j = json!({
276            "brokers": "broker:9092",
277            "topics": ["t1"],
278            "group_id": "g",
279            "max_messages": 100,
280            "batch_size": 250,
281        });
282        let parsed: KafkaSourceConfig = serde_json::from_value(j).unwrap();
283        assert_eq!(parsed.batch_size, 250);
284    }
285}