1use std::time::Duration;
16
17use anyhow::anyhow;
18
19use crate::admin::KafkaAdmin;
20use crate::consumer::KafkaConsumer;
21use crate::producer::KafkaProducer;
22use crate::{
23 AdminConfig, AutoOffsetReset, ConsumerConfig, IsolationLevel, ProducerCompression,
24 ProducerConfig, Result, SaslConfig, SecurityProtocol, TlsConfig,
25};
26
27const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
28
29#[derive(Debug, Clone)]
30pub struct KafkaClient {
32 bootstrap_servers: Vec<String>,
33}
34
35impl KafkaClient {
36 pub fn new(bootstrap_server: impl Into<String>) -> Self {
38 Self {
39 bootstrap_servers: vec![bootstrap_server.into()],
40 }
41 }
42
43 pub fn with_bootstrap_servers(
45 mut self,
46 servers: impl IntoIterator<Item = impl Into<String>>,
47 ) -> Self {
48 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
49 self
50 }
51
52 pub fn topic(&self, topic: impl Into<String>) -> KafkaTopic {
54 KafkaTopic {
55 bootstrap_servers: self.bootstrap_servers.clone(),
56 topic: topic.into(),
57 }
58 }
59
60 pub fn producer(&self) -> ProducerBuilder {
62 ProducerBuilder::from_servers(self.bootstrap_servers.clone())
63 }
64
65 pub fn admin(&self) -> AdminBuilder {
67 AdminBuilder::from_servers(self.bootstrap_servers.clone())
68 }
69
70 pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
72 ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
73 }
74}
75
76#[derive(Debug, Clone)]
77pub struct KafkaTopic {
79 bootstrap_servers: Vec<String>,
80 topic: String,
81}
82
83impl KafkaTopic {
84 pub fn producer(&self) -> ProducerBuilder {
86 ProducerBuilder::from_servers(self.bootstrap_servers.clone())
87 .with_default_topic(self.topic.clone())
88 }
89
90 pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
92 ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
93 .with_topic(self.topic.clone())
94 }
95}
96
97#[derive(Debug, Clone)]
98pub struct ProducerBuilder {
100 config: ProducerConfig,
101 default_topic: Option<String>,
102 default_partition: Option<i32>,
103}
104
105impl ProducerBuilder {
106 fn new(bootstrap_server: impl Into<String>) -> Self {
107 Self {
108 config: ProducerConfig::new(bootstrap_server),
109 default_topic: None,
110 default_partition: None,
111 }
112 }
113
114 fn from_servers(servers: Vec<String>) -> Self {
115 let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
116 builder.config = builder.config.with_bootstrap_servers(servers);
117 builder
118 }
119
120 pub fn with_bootstrap_servers(
122 mut self,
123 servers: impl IntoIterator<Item = impl Into<String>>,
124 ) -> Self {
125 self.config = self.config.with_bootstrap_servers(servers);
126 self
127 }
128
129 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
131 self.config = self.config.with_client_id(client_id);
132 self
133 }
134
135 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
137 self.config = self.config.with_security_protocol(security_protocol);
138 self
139 }
140
141 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
143 self.config = self.config.with_tls(tls);
144 self
145 }
146
147 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
149 self.config = self.config.with_sasl(sasl);
150 self
151 }
152
153 pub fn with_sasl_plain(
155 mut self,
156 username: impl Into<String>,
157 password: impl Into<String>,
158 ) -> Self {
159 self.config = self.config.with_sasl_plain(username, password);
160 self
161 }
162
163 pub fn with_sasl_scram_sha_256(
165 mut self,
166 username: impl Into<String>,
167 password: impl Into<String>,
168 ) -> Self {
169 self.config = self.config.with_sasl_scram_sha_256(username, password);
170 self
171 }
172
173 pub fn with_sasl_scram_sha_512(
175 mut self,
176 username: impl Into<String>,
177 password: impl Into<String>,
178 ) -> Self {
179 self.config = self.config.with_sasl_scram_sha_512(username, password);
180 self
181 }
182
183 pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
185 self.config = self.config.with_compression(compression);
186 self
187 }
188
189 pub fn with_default_topic(mut self, topic: impl Into<String>) -> Self {
191 self.default_topic = Some(topic.into());
192 self
193 }
194
195 pub fn with_default_partition(mut self, partition: i32) -> Self {
197 self.default_partition = Some(partition);
198 self
199 }
200
201 pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
203 self.config = self.config.with_enable_idempotence(enable_idempotence);
204 self
205 }
206
207 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
209 self.config = self.config.with_batch_size(batch_size);
210 self
211 }
212
213 pub fn with_linger(mut self, linger: Duration) -> Self {
215 self.config = self.config.with_linger(linger);
216 self
217 }
218
219 pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
221 self.config = self.config.with_delivery_timeout(delivery_timeout);
222 self
223 }
224
225 pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
227 self.config = self
228 .config
229 .with_max_in_flight_requests_per_connection(max_in_flight);
230 self
231 }
232
233 pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
235 self.config = self.config.with_transactional_id(transactional_id);
236 self
237 }
238
239 pub async fn connect(self) -> Result<KafkaProducer> {
241 let producer = KafkaProducer::connect(self.config).await?;
242 Ok(producer.with_defaults(self.default_topic, self.default_partition))
243 }
244}
245
246#[derive(Debug, Clone)]
247pub struct AdminBuilder {
249 config: AdminConfig,
250}
251
252impl AdminBuilder {
253 fn new(bootstrap_server: impl Into<String>) -> Self {
254 Self {
255 config: AdminConfig::new(bootstrap_server),
256 }
257 }
258
259 fn from_servers(servers: Vec<String>) -> Self {
260 let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
261 builder.config = builder.config.with_bootstrap_servers(servers);
262 builder
263 }
264
265 pub fn with_bootstrap_servers(
267 mut self,
268 servers: impl IntoIterator<Item = impl Into<String>>,
269 ) -> Self {
270 self.config = self.config.with_bootstrap_servers(servers);
271 self
272 }
273
274 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
276 self.config = self.config.with_client_id(client_id);
277 self
278 }
279
280 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
282 self.config = self.config.with_request_timeout(request_timeout);
283 self
284 }
285
286 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
288 self.config = self.config.with_security_protocol(security_protocol);
289 self
290 }
291
292 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
294 self.config = self.config.with_tls(tls);
295 self
296 }
297
298 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
300 self.config = self.config.with_sasl(sasl);
301 self
302 }
303
304 pub fn with_sasl_plain(
306 mut self,
307 username: impl Into<String>,
308 password: impl Into<String>,
309 ) -> Self {
310 self.config = self.config.with_sasl_plain(username, password);
311 self
312 }
313
314 pub fn with_sasl_scram_sha_256(
316 mut self,
317 username: impl Into<String>,
318 password: impl Into<String>,
319 ) -> Self {
320 self.config = self.config.with_sasl_scram_sha_256(username, password);
321 self
322 }
323
324 pub fn with_sasl_scram_sha_512(
326 mut self,
327 username: impl Into<String>,
328 password: impl Into<String>,
329 ) -> Self {
330 self.config = self.config.with_sasl_scram_sha_512(username, password);
331 self
332 }
333
334 pub async fn connect(self) -> Result<KafkaAdmin> {
336 KafkaAdmin::connect(self.config).await
337 }
338}
339
340#[derive(Debug, Clone)]
341pub struct ConsumerBuilder {
343 config: ConsumerConfig,
344 topics: Vec<String>,
345 poll_timeout: Duration,
346}
347
348impl ConsumerBuilder {
349 fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
350 Self {
351 config: ConsumerConfig::new(bootstrap_server, group_id),
352 topics: Vec::new(),
353 poll_timeout: DEFAULT_POLL_TIMEOUT,
354 }
355 }
356
357 fn from_servers(servers: Vec<String>, group_id: impl Into<String>) -> Self {
358 let first = servers.first().cloned().unwrap_or_default();
359 let mut builder = Self::new(first, group_id);
360 builder.config = builder.config.with_bootstrap_servers(servers);
361 builder
362 }
363
364 pub fn with_bootstrap_servers(
366 mut self,
367 servers: impl IntoIterator<Item = impl Into<String>>,
368 ) -> Self {
369 self.config = self.config.with_bootstrap_servers(servers);
370 self
371 }
372
373 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
375 self.config = self.config.with_client_id(client_id);
376 self
377 }
378
379 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
381 self.config = self.config.with_security_protocol(security_protocol);
382 self
383 }
384
385 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
387 self.config = self.config.with_tls(tls);
388 self
389 }
390
391 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
393 self.config = self.config.with_sasl(sasl);
394 self
395 }
396
397 pub fn with_sasl_plain(
399 mut self,
400 username: impl Into<String>,
401 password: impl Into<String>,
402 ) -> Self {
403 self.config = self.config.with_sasl_plain(username, password);
404 self
405 }
406
407 pub fn with_sasl_scram_sha_256(
409 mut self,
410 username: impl Into<String>,
411 password: impl Into<String>,
412 ) -> Self {
413 self.config = self.config.with_sasl_scram_sha_256(username, password);
414 self
415 }
416
417 pub fn with_sasl_scram_sha_512(
419 mut self,
420 username: impl Into<String>,
421 password: impl Into<String>,
422 ) -> Self {
423 self.config = self.config.with_sasl_scram_sha_512(username, password);
424 self
425 }
426
427 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
429 self.topics.push(topic.into());
430 self
431 }
432
433 pub fn with_topics<I, S>(mut self, topics: I) -> Self
435 where
436 I: IntoIterator<Item = S>,
437 S: Into<String>,
438 {
439 self.topics.extend(topics.into_iter().map(Into::into));
440 self
441 }
442
443 pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
445 self.config = self.config.with_auto_offset_reset(auto_offset_reset);
446 self
447 }
448
449 pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
451 self.config = self.config.with_isolation_level(isolation_level);
452 self
453 }
454
455 pub fn with_auto_commit(mut self, enable_auto_commit: bool) -> Self {
457 self.config = self.config.with_enable_auto_commit(enable_auto_commit);
458 self
459 }
460
461 pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
463 self.config = self.config.with_instance_id(instance_id);
464 self
465 }
466
467 pub fn with_poll_timeout(mut self, poll_timeout: Duration) -> Self {
469 self.poll_timeout = poll_timeout;
470 self
471 }
472
473 pub async fn connect(self) -> Result<KafkaConsumer> {
475 let topics = self
476 .topics
477 .into_iter()
478 .map(validate_topic_name)
479 .collect::<Result<Vec<_>>>()?;
480 let consumer = KafkaConsumer::connect(self.config).await?;
481 if !topics.is_empty()
482 && let Err(error) = consumer.subscribe(topics).await
483 {
484 let _ = consumer.shutdown().await;
485 return Err(error);
486 }
487 Ok(consumer.with_default_poll_timeout(self.poll_timeout))
488 }
489}
490
491fn validate_topic_name(topic: String) -> Result<String> {
492 let topic = topic.trim();
493 if topic.is_empty() {
494 return Err(anyhow!("topic must be non-empty").into());
495 }
496 Ok(topic.to_owned())
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502 use crate::SaslMechanism;
503
504 #[test]
505 fn client_builders_preserve_bootstrap_servers_and_topic_defaults() {
506 let client =
507 KafkaClient::new("host-a:9092").with_bootstrap_servers(["host-a:9092", "host-b:9092"]);
508
509 let producer = client
510 .topic("orders")
511 .producer()
512 .with_client_id("producer-a")
513 .with_default_partition(2)
514 .with_compression(ProducerCompression::Lz4)
515 .with_batch_size(32)
516 .with_linger(Duration::from_millis(10))
517 .with_delivery_timeout(Duration::from_secs(3))
518 .with_transactional_id("tx-a");
519
520 assert_eq!(
521 producer.config.bootstrap_servers,
522 vec!["host-a:9092", "host-b:9092"]
523 );
524 assert_eq!(producer.config.client_id, "producer-a");
525 assert_eq!(producer.config.compression, ProducerCompression::Lz4);
526 assert_eq!(producer.config.batch_size, 32);
527 assert_eq!(producer.config.linger, Duration::from_millis(10));
528 assert_eq!(producer.config.delivery_timeout, Duration::from_secs(3));
529 assert_eq!(producer.config.transactional_id.as_deref(), Some("tx-a"));
530 assert_eq!(producer.default_topic.as_deref(), Some("orders"));
531 assert_eq!(producer.default_partition, Some(2));
532 }
533
534 #[test]
535 fn admin_builder_forwards_security_and_timeout_options() {
536 let builder = KafkaClient::new("host-a:9092")
537 .admin()
538 .with_bootstrap_servers(["host-b:9092", "host-c:9092"])
539 .with_client_id("admin-a")
540 .with_request_timeout(Duration::from_secs(9))
541 .with_security_protocol(SecurityProtocol::Ssl)
542 .with_tls(TlsConfig::new().with_server_name("kafka.internal"))
543 .with_sasl_scram_sha_512("user-a", "secret-a");
544
545 assert_eq!(
546 builder.config.bootstrap_servers,
547 vec!["host-b:9092", "host-c:9092"]
548 );
549 assert_eq!(builder.config.client_id, "admin-a");
550 assert_eq!(builder.config.request_timeout, Duration::from_secs(9));
551 assert_eq!(builder.config.security_protocol, SecurityProtocol::SaslSsl);
552 assert_eq!(
553 builder.config.tls.server_name.as_deref(),
554 Some("kafka.internal")
555 );
556 assert_eq!(builder.config.sasl.mechanism, SaslMechanism::ScramSha512);
557 }
558
559 #[test]
560 fn consumer_builder_collects_topics_and_group_options() {
561 let builder = KafkaClient::new("host-a:9092")
562 .topic("orders")
563 .consumer("group-a")
564 .with_bootstrap_servers(["host-b:9092"])
565 .with_client_id("consumer-a")
566 .with_topic("payments")
567 .with_topics(["shipments", "invoices"])
568 .with_auto_offset_reset(AutoOffsetReset::Latest)
569 .with_isolation_level(IsolationLevel::ReadCommitted)
570 .with_auto_commit(true)
571 .with_instance_id("instance-a")
572 .with_poll_timeout(Duration::from_millis(250))
573 .with_sasl_plain("user-a", "secret-a");
574
575 assert_eq!(builder.config.bootstrap_servers, vec!["host-b:9092"]);
576 assert_eq!(builder.config.client_id, "consumer-a");
577 assert_eq!(builder.config.group_id, "group-a");
578 assert_eq!(
579 builder.topics,
580 vec!["orders", "payments", "shipments", "invoices"]
581 );
582 assert_eq!(builder.config.auto_offset_reset, AutoOffsetReset::Latest);
583 assert_eq!(
584 builder.config.isolation_level,
585 IsolationLevel::ReadCommitted
586 );
587 assert!(builder.config.enable_auto_commit);
588 assert_eq!(builder.config.instance_id.as_deref(), Some("instance-a"));
589 assert_eq!(builder.poll_timeout, Duration::from_millis(250));
590 assert_eq!(
591 builder.config.security_protocol,
592 SecurityProtocol::SaslPlaintext
593 );
594 }
595
596 #[test]
597 fn validate_topic_name_trims_and_rejects_empty_names() {
598 assert_eq!(
599 validate_topic_name(" topic-a ".to_owned()).unwrap(),
600 "topic-a"
601 );
602 assert!(validate_topic_name(" ".to_owned()).is_err());
603 }
604}