1use std::sync::Arc;
16use std::time::Duration;
17
18use crate::admin::KafkaAdmin;
19use crate::consumer::KafkaConsumer;
20use crate::network::TcpConnector;
21use crate::producer::KafkaProducer;
22use crate::{
23 AdminConfig, AutoOffsetReset, ConsumerConfig, IsolationLevel, ProducerCompression,
24 ProducerConfig, Result, SaslConfig, SecurityProtocol, TlsConfig, ValidationError,
25};
26use crate::{ConsumerRebalanceEvent, ConsumerRebalanceListener};
27
28const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
29
30#[derive(Debug, Clone)]
31pub struct KafkaClient {
33 bootstrap_servers: Vec<String>,
34}
35
36impl KafkaClient {
37 pub fn new(bootstrap_server: impl Into<String>) -> Self {
39 Self {
40 bootstrap_servers: vec![bootstrap_server.into()],
41 }
42 }
43
44 pub fn with_bootstrap_servers(
46 mut self,
47 servers: impl IntoIterator<Item = impl Into<String>>,
48 ) -> Self {
49 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
50 self
51 }
52
53 pub fn topic(&self, topic: impl Into<String>) -> KafkaTopic {
55 KafkaTopic {
56 bootstrap_servers: self.bootstrap_servers.clone(),
57 topic: topic.into(),
58 }
59 }
60
61 pub fn topics<I, S>(&self, topics: I) -> KafkaTopics
63 where
64 I: IntoIterator<Item = S>,
65 S: Into<String>,
66 {
67 KafkaTopics {
68 bootstrap_servers: self.bootstrap_servers.clone(),
69 topics: topics.into_iter().map(Into::into).collect(),
70 }
71 }
72
73 pub fn producer(&self) -> ProducerBuilder {
75 ProducerBuilder::from_servers(self.bootstrap_servers.clone())
76 }
77
78 pub fn admin(&self) -> AdminBuilder {
80 AdminBuilder::from_servers(self.bootstrap_servers.clone())
81 }
82
83 pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
85 ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
86 }
87}
88
89#[derive(Debug, Clone)]
90pub struct KafkaTopic {
92 bootstrap_servers: Vec<String>,
93 topic: String,
94}
95
96impl KafkaTopic {
97 pub fn producer(&self) -> ProducerBuilder {
99 ProducerBuilder::from_servers(self.bootstrap_servers.clone())
100 .with_default_topic(self.topic.clone())
101 }
102
103 pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
105 ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
106 .with_topic(self.topic.clone())
107 }
108}
109
110#[derive(Debug, Clone)]
111pub struct KafkaTopics {
113 bootstrap_servers: Vec<String>,
114 topics: Vec<String>,
115}
116
117impl KafkaTopics {
118 pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
120 ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
121 .with_topics(self.topics.clone())
122 }
123}
124
125#[derive(Debug, Clone)]
126pub struct ProducerBuilder {
128 config: ProducerConfig,
129 default_topic: Option<String>,
130 default_partition: Option<i32>,
131}
132
133impl ProducerBuilder {
134 fn new(bootstrap_server: impl Into<String>) -> Self {
135 Self {
136 config: ProducerConfig::new(bootstrap_server),
137 default_topic: None,
138 default_partition: None,
139 }
140 }
141
142 fn from_servers(servers: Vec<String>) -> Self {
143 let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
144 builder.config = builder.config.with_bootstrap_servers(servers);
145 builder
146 }
147
148 pub fn with_bootstrap_servers(
150 mut self,
151 servers: impl IntoIterator<Item = impl Into<String>>,
152 ) -> Self {
153 self.config = self.config.with_bootstrap_servers(servers);
154 self
155 }
156
157 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
159 self.config = self.config.with_client_id(client_id);
160 self
161 }
162
163 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
165 self.config = self.config.with_security_protocol(security_protocol);
166 self
167 }
168
169 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
171 self.config = self.config.with_tls(tls);
172 self
173 }
174
175 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
177 self.config = self.config.with_sasl(sasl);
178 self
179 }
180
181 pub fn with_sasl_plain(
183 mut self,
184 username: impl Into<String>,
185 password: impl Into<String>,
186 ) -> Self {
187 self.config = self.config.with_sasl_plain(username, password);
188 self
189 }
190
191 pub fn with_sasl_scram_sha_256(
193 mut self,
194 username: impl Into<String>,
195 password: impl Into<String>,
196 ) -> Self {
197 self.config = self.config.with_sasl_scram_sha_256(username, password);
198 self
199 }
200
201 pub fn with_sasl_scram_sha_512(
203 mut self,
204 username: impl Into<String>,
205 password: impl Into<String>,
206 ) -> Self {
207 self.config = self.config.with_sasl_scram_sha_512(username, password);
208 self
209 }
210
211 pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
213 self.config = self.config.with_compression(compression);
214 self
215 }
216
217 pub fn with_default_topic(mut self, topic: impl Into<String>) -> Self {
219 self.default_topic = Some(topic.into());
220 self
221 }
222
223 pub fn with_default_partition(mut self, partition: i32) -> Self {
225 self.default_partition = Some(partition);
226 self
227 }
228
229 pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
231 self.config = self.config.with_enable_idempotence(enable_idempotence);
232 self
233 }
234
235 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
237 self.config = self.config.with_batch_size(batch_size);
238 self
239 }
240
241 pub fn with_buffer_memory(mut self, buffer_memory: usize) -> Self {
243 self.config = self.config.with_buffer_memory(buffer_memory);
244 self
245 }
246
247 pub fn with_max_block(mut self, max_block: Duration) -> Self {
249 self.config = self.config.with_max_block(max_block);
250 self
251 }
252
253 pub fn with_max_request_size(mut self, max_request_size: usize) -> Self {
255 self.config = self.config.with_max_request_size(max_request_size);
256 self
257 }
258
259 pub fn with_partitioner_ignore_keys(mut self, ignore_keys: bool) -> Self {
261 self.config = self.config.with_partitioner_ignore_keys(ignore_keys);
262 self
263 }
264
265 pub fn with_linger(mut self, linger: Duration) -> Self {
267 self.config = self.config.with_linger(linger);
268 self
269 }
270
271 pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
273 self.config = self.config.with_delivery_timeout(delivery_timeout);
274 self
275 }
276
277 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
279 self.config = self.config.with_request_timeout(request_timeout);
280 self
281 }
282
283 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
285 self.config = self.config.with_retry_backoff(retry_backoff);
286 self
287 }
288
289 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
291 self.config = self.config.with_max_retries(max_retries);
292 self
293 }
294
295 pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
297 self.config = self
298 .config
299 .with_max_in_flight_requests_per_connection(max_in_flight);
300 self
301 }
302
303 pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
305 self.config = self.config.with_transactional_id(transactional_id);
306 self
307 }
308
309 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
311 self.config = self.config.with_tcp_connector(tcp_connector);
312 self
313 }
314
315 pub async fn connect(self) -> Result<KafkaProducer> {
317 let producer = KafkaProducer::connect(self.config).await?;
318 Ok(producer.with_defaults(self.default_topic, self.default_partition))
319 }
320}
321
322#[derive(Debug, Clone)]
323pub struct AdminBuilder {
325 config: AdminConfig,
326}
327
328impl AdminBuilder {
329 fn new(bootstrap_server: impl Into<String>) -> Self {
330 Self {
331 config: AdminConfig::new(bootstrap_server),
332 }
333 }
334
335 fn from_servers(servers: Vec<String>) -> Self {
336 let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
337 builder.config = builder.config.with_bootstrap_servers(servers);
338 builder
339 }
340
341 pub fn with_bootstrap_servers(
343 mut self,
344 servers: impl IntoIterator<Item = impl Into<String>>,
345 ) -> Self {
346 self.config = self.config.with_bootstrap_servers(servers);
347 self
348 }
349
350 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
352 self.config = self.config.with_client_id(client_id);
353 self
354 }
355
356 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
358 self.config = self.config.with_request_timeout(request_timeout);
359 self
360 }
361
362 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
364 self.config = self.config.with_security_protocol(security_protocol);
365 self
366 }
367
368 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
370 self.config = self.config.with_tls(tls);
371 self
372 }
373
374 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
376 self.config = self.config.with_sasl(sasl);
377 self
378 }
379
380 pub fn with_sasl_plain(
382 mut self,
383 username: impl Into<String>,
384 password: impl Into<String>,
385 ) -> Self {
386 self.config = self.config.with_sasl_plain(username, password);
387 self
388 }
389
390 pub fn with_sasl_scram_sha_256(
392 mut self,
393 username: impl Into<String>,
394 password: impl Into<String>,
395 ) -> Self {
396 self.config = self.config.with_sasl_scram_sha_256(username, password);
397 self
398 }
399
400 pub fn with_sasl_scram_sha_512(
402 mut self,
403 username: impl Into<String>,
404 password: impl Into<String>,
405 ) -> Self {
406 self.config = self.config.with_sasl_scram_sha_512(username, password);
407 self
408 }
409
410 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
412 self.config = self.config.with_tcp_connector(tcp_connector);
413 self
414 }
415
416 pub async fn connect(self) -> Result<KafkaAdmin> {
418 KafkaAdmin::connect(self.config).await
419 }
420}
421
422#[derive(Debug, Clone)]
423pub struct ConsumerBuilder {
425 config: ConsumerConfig,
426 topics: Vec<String>,
427 poll_timeout: Duration,
428}
429
430impl ConsumerBuilder {
431 fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
432 Self {
433 config: ConsumerConfig::new(bootstrap_server, group_id),
434 topics: Vec::new(),
435 poll_timeout: DEFAULT_POLL_TIMEOUT,
436 }
437 }
438
439 fn from_servers(servers: Vec<String>, group_id: impl Into<String>) -> Self {
440 let first = servers.first().cloned().unwrap_or_default();
441 let mut builder = Self::new(first, group_id);
442 builder.config = builder.config.with_bootstrap_servers(servers);
443 builder
444 }
445
446 pub fn with_bootstrap_servers(
448 mut self,
449 servers: impl IntoIterator<Item = impl Into<String>>,
450 ) -> Self {
451 self.config = self.config.with_bootstrap_servers(servers);
452 self
453 }
454
455 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
457 self.config = self.config.with_client_id(client_id);
458 self
459 }
460
461 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
463 self.config = self.config.with_security_protocol(security_protocol);
464 self
465 }
466
467 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
469 self.config = self.config.with_tls(tls);
470 self
471 }
472
473 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
475 self.config = self.config.with_sasl(sasl);
476 self
477 }
478
479 pub fn with_sasl_plain(
481 mut self,
482 username: impl Into<String>,
483 password: impl Into<String>,
484 ) -> Self {
485 self.config = self.config.with_sasl_plain(username, password);
486 self
487 }
488
489 pub fn with_sasl_scram_sha_256(
491 mut self,
492 username: impl Into<String>,
493 password: impl Into<String>,
494 ) -> Self {
495 self.config = self.config.with_sasl_scram_sha_256(username, password);
496 self
497 }
498
499 pub fn with_sasl_scram_sha_512(
501 mut self,
502 username: impl Into<String>,
503 password: impl Into<String>,
504 ) -> Self {
505 self.config = self.config.with_sasl_scram_sha_512(username, password);
506 self
507 }
508
509 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
511 self.topics.push(topic.into());
512 self
513 }
514
515 pub fn with_topics<I, S>(mut self, topics: I) -> Self
517 where
518 I: IntoIterator<Item = S>,
519 S: Into<String>,
520 {
521 self.topics.extend(topics.into_iter().map(Into::into));
522 self
523 }
524
525 pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
527 self.config = self.config.with_auto_offset_reset(auto_offset_reset);
528 self
529 }
530
531 pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
533 self.config = self.config.with_isolation_level(isolation_level);
534 self
535 }
536
537 pub fn with_auto_commit(mut self, enable_auto_commit: bool) -> Self {
539 self.config = self.config.with_enable_auto_commit(enable_auto_commit);
540 self
541 }
542
543 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
545 self.config = self.config.with_request_timeout(request_timeout);
546 self
547 }
548
549 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
551 self.config = self.config.with_retry_backoff(retry_backoff);
552 self
553 }
554
555 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
557 self.config = self.config.with_max_retries(max_retries);
558 self
559 }
560
561 pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
563 self.config = self.config.with_instance_id(instance_id);
564 self
565 }
566
567 pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
569 self.config = self.config.with_rebalance_listener(listener);
570 self
571 }
572
573 pub fn with_rebalance_callback(
575 mut self,
576 callback: impl Fn(ConsumerRebalanceEvent) + Send + Sync + 'static,
577 ) -> Self {
578 self.config = self.config.with_rebalance_callback(callback);
579 self
580 }
581
582 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
584 self.config = self.config.with_tcp_connector(tcp_connector);
585 self
586 }
587
588 pub fn with_poll_timeout(mut self, poll_timeout: Duration) -> Self {
590 self.poll_timeout = poll_timeout;
591 self
592 }
593
594 pub async fn connect(self) -> Result<KafkaConsumer> {
596 let topics = self
597 .topics
598 .into_iter()
599 .map(validate_topic_name)
600 .collect::<Result<Vec<_>>>()?;
601 let consumer = KafkaConsumer::connect(self.config).await?;
602 if !topics.is_empty()
603 && let Err(error) = consumer.subscribe(topics).await
604 {
605 let _ = consumer.shutdown().await;
606 return Err(error);
607 }
608 Ok(consumer.with_default_poll_timeout(self.poll_timeout))
609 }
610}
611
612fn validate_topic_name(topic: String) -> Result<String> {
613 let topic = topic.trim();
614 if topic.is_empty() {
615 return Err(ValidationError::EmptyTopicName {
616 operation: "topic builder",
617 }
618 .into());
619 }
620 Ok(topic.to_owned())
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626 use crate::SaslMechanism;
627
628 #[test]
629 fn client_builders_preserve_bootstrap_servers_and_topic_defaults() {
630 let client =
631 KafkaClient::new("host-a:9092").with_bootstrap_servers(["host-a:9092", "host-b:9092"]);
632
633 let producer = client
634 .topic("orders")
635 .producer()
636 .with_client_id("producer-a")
637 .with_default_partition(2)
638 .with_compression(ProducerCompression::Lz4)
639 .with_batch_size(32)
640 .with_buffer_memory(4096)
641 .with_max_block(Duration::from_millis(250))
642 .with_max_request_size(2048)
643 .with_partitioner_ignore_keys(true)
644 .with_linger(Duration::from_millis(10))
645 .with_delivery_timeout(Duration::from_secs(3))
646 .with_transactional_id("tx-a");
647
648 assert_eq!(
649 producer.config.bootstrap_servers,
650 vec!["host-a:9092", "host-b:9092"]
651 );
652 assert_eq!(producer.config.client_id, "producer-a");
653 assert_eq!(producer.config.compression, ProducerCompression::Lz4);
654 assert_eq!(producer.config.batch_size, 32);
655 assert_eq!(producer.config.buffer_memory, 4096);
656 assert_eq!(producer.config.max_block, Duration::from_millis(250));
657 assert_eq!(producer.config.max_request_size, 2048);
658 assert!(producer.config.partitioner_ignore_keys);
659 assert_eq!(producer.config.linger, Duration::from_millis(10));
660 assert_eq!(producer.config.delivery_timeout, Duration::from_secs(3));
661 assert_eq!(producer.config.transactional_id.as_deref(), Some("tx-a"));
662 assert_eq!(producer.default_topic.as_deref(), Some("orders"));
663 assert_eq!(producer.default_partition, Some(2));
664 }
665
666 #[test]
667 fn admin_builder_forwards_security_and_timeout_options() {
668 let builder = KafkaClient::new("host-a:9092")
669 .admin()
670 .with_bootstrap_servers(["host-b:9092", "host-c:9092"])
671 .with_client_id("admin-a")
672 .with_request_timeout(Duration::from_secs(9))
673 .with_security_protocol(SecurityProtocol::Ssl)
674 .with_tls(TlsConfig::new().with_server_name("kafka.internal"))
675 .with_sasl_scram_sha_512("user-a", "secret-a");
676
677 assert_eq!(
678 builder.config.bootstrap_servers,
679 vec!["host-b:9092", "host-c:9092"]
680 );
681 assert_eq!(builder.config.client_id, "admin-a");
682 assert_eq!(builder.config.request_timeout, Duration::from_secs(9));
683 assert_eq!(builder.config.security_protocol, SecurityProtocol::SaslSsl);
684 assert_eq!(
685 builder.config.tls.server_name.as_deref(),
686 Some("kafka.internal")
687 );
688 assert_eq!(builder.config.sasl.mechanism, SaslMechanism::ScramSha512);
689 }
690
691 #[test]
692 fn consumer_builder_collects_topics_and_group_options() {
693 let builder = KafkaClient::new("host-a:9092")
694 .topic("orders")
695 .consumer("group-a")
696 .with_bootstrap_servers(["host-b:9092"])
697 .with_client_id("consumer-a")
698 .with_topic("payments")
699 .with_topics(["shipments", "invoices"])
700 .with_auto_offset_reset(AutoOffsetReset::Latest)
701 .with_isolation_level(IsolationLevel::ReadCommitted)
702 .with_auto_commit(true)
703 .with_instance_id("instance-a")
704 .with_poll_timeout(Duration::from_millis(250))
705 .with_sasl_plain("user-a", "secret-a");
706
707 assert_eq!(builder.config.bootstrap_servers, vec!["host-b:9092"]);
708 assert_eq!(builder.config.client_id, "consumer-a");
709 assert_eq!(builder.config.group_id, "group-a");
710 assert_eq!(
711 builder.topics,
712 vec!["orders", "payments", "shipments", "invoices"]
713 );
714 assert_eq!(builder.config.auto_offset_reset, AutoOffsetReset::Latest);
715 assert_eq!(
716 builder.config.isolation_level,
717 IsolationLevel::ReadCommitted
718 );
719 assert!(builder.config.enable_auto_commit);
720 assert_eq!(builder.config.instance_id.as_deref(), Some("instance-a"));
721 assert_eq!(builder.poll_timeout, Duration::from_millis(250));
722 assert_eq!(
723 builder.config.security_protocol,
724 SecurityProtocol::SaslPlaintext
725 );
726 }
727
728 #[test]
729 fn topics_facade_collects_multi_topic_consumer_defaults() {
730 let builder = KafkaClient::new("host-a:9092")
731 .with_bootstrap_servers(["host-a:9092", "host-b:9092"])
732 .topics(["orders", "payments"])
733 .consumer("group-a")
734 .with_topic("shipments");
735
736 assert_eq!(
737 builder.config.bootstrap_servers,
738 vec!["host-a:9092", "host-b:9092"]
739 );
740 assert_eq!(builder.config.group_id, "group-a");
741 assert_eq!(builder.topics, vec!["orders", "payments", "shipments"]);
742 }
743
744 #[test]
745 fn validate_topic_name_trims_and_rejects_empty_names() {
746 assert_eq!(
747 validate_topic_name(" topic-a ".to_owned()).unwrap(),
748 "topic-a"
749 );
750 assert!(matches!(
751 validate_topic_name(" ".to_owned()),
752 Err(crate::Error::Validation(ValidationError::EmptyTopicName {
753 operation: "topic builder"
754 }))
755 ));
756 }
757}