1use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19
20use crate::admin::KafkaAdmin;
21use crate::consumer::KafkaConsumer;
22use crate::network::TcpConnector;
23use crate::producer::KafkaProducer;
24use crate::{
25 AdminConfig, AutoOffsetReset, ConsumerConfig, IsolationLevel, ProducerCompression,
26 ProducerConfig, Result, SaslConfig, SecurityProtocol, TlsConfig,
27};
28use crate::{ConsumerRebalanceEvent, ConsumerRebalanceListener};
29
30const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
31
32#[derive(Debug, Clone)]
33pub struct KafkaClient {
35 bootstrap_servers: Vec<String>,
36}
37
38impl KafkaClient {
39 pub fn new(bootstrap_server: impl Into<String>) -> Self {
41 Self {
42 bootstrap_servers: vec![bootstrap_server.into()],
43 }
44 }
45
46 pub fn with_bootstrap_servers(
48 mut self,
49 servers: impl IntoIterator<Item = impl Into<String>>,
50 ) -> Self {
51 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
52 self
53 }
54
55 pub fn topic(&self, topic: impl Into<String>) -> KafkaTopic {
57 KafkaTopic {
58 bootstrap_servers: self.bootstrap_servers.clone(),
59 topic: topic.into(),
60 }
61 }
62
63 pub fn topics<I, S>(&self, topics: I) -> KafkaTopics
65 where
66 I: IntoIterator<Item = S>,
67 S: Into<String>,
68 {
69 KafkaTopics {
70 bootstrap_servers: self.bootstrap_servers.clone(),
71 topics: topics.into_iter().map(Into::into).collect(),
72 }
73 }
74
75 pub fn producer(&self) -> ProducerBuilder {
77 ProducerBuilder::from_servers(self.bootstrap_servers.clone())
78 }
79
80 pub fn admin(&self) -> AdminBuilder {
82 AdminBuilder::from_servers(self.bootstrap_servers.clone())
83 }
84
85 pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
87 ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
88 }
89}
90
91#[derive(Debug, Clone)]
92pub struct KafkaTopic {
94 bootstrap_servers: Vec<String>,
95 topic: String,
96}
97
98impl KafkaTopic {
99 pub fn producer(&self) -> ProducerBuilder {
101 ProducerBuilder::from_servers(self.bootstrap_servers.clone())
102 .with_default_topic(self.topic.clone())
103 }
104
105 pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
107 ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
108 .with_topic(self.topic.clone())
109 }
110}
111
112#[derive(Debug, Clone)]
113pub struct KafkaTopics {
115 bootstrap_servers: Vec<String>,
116 topics: Vec<String>,
117}
118
119impl KafkaTopics {
120 pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
122 ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
123 .with_topics(self.topics.clone())
124 }
125}
126
127#[derive(Debug, Clone)]
128pub struct ProducerBuilder {
130 config: ProducerConfig,
131 default_topic: Option<String>,
132 default_partition: Option<i32>,
133}
134
135impl ProducerBuilder {
136 fn new(bootstrap_server: impl Into<String>) -> Self {
137 Self {
138 config: ProducerConfig::new(bootstrap_server),
139 default_topic: None,
140 default_partition: None,
141 }
142 }
143
144 fn from_servers(servers: Vec<String>) -> Self {
145 let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
146 builder.config = builder.config.with_bootstrap_servers(servers);
147 builder
148 }
149
150 pub fn with_bootstrap_servers(
152 mut self,
153 servers: impl IntoIterator<Item = impl Into<String>>,
154 ) -> Self {
155 self.config = self.config.with_bootstrap_servers(servers);
156 self
157 }
158
159 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
161 self.config = self.config.with_client_id(client_id);
162 self
163 }
164
165 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
167 self.config = self.config.with_security_protocol(security_protocol);
168 self
169 }
170
171 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
173 self.config = self.config.with_tls(tls);
174 self
175 }
176
177 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
179 self.config = self.config.with_sasl(sasl);
180 self
181 }
182
183 pub fn with_sasl_plain(
185 mut self,
186 username: impl Into<String>,
187 password: impl Into<String>,
188 ) -> Self {
189 self.config = self.config.with_sasl_plain(username, password);
190 self
191 }
192
193 pub fn with_sasl_scram_sha_256(
195 mut self,
196 username: impl Into<String>,
197 password: impl Into<String>,
198 ) -> Self {
199 self.config = self.config.with_sasl_scram_sha_256(username, password);
200 self
201 }
202
203 pub fn with_sasl_scram_sha_512(
205 mut self,
206 username: impl Into<String>,
207 password: impl Into<String>,
208 ) -> Self {
209 self.config = self.config.with_sasl_scram_sha_512(username, password);
210 self
211 }
212
213 pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
215 self.config = self.config.with_compression(compression);
216 self
217 }
218
219 pub fn with_default_topic(mut self, topic: impl Into<String>) -> Self {
221 self.default_topic = Some(topic.into());
222 self
223 }
224
225 pub fn with_default_partition(mut self, partition: i32) -> Self {
227 self.default_partition = Some(partition);
228 self
229 }
230
231 pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
233 self.config = self.config.with_enable_idempotence(enable_idempotence);
234 self
235 }
236
237 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
239 self.config = self.config.with_batch_size(batch_size);
240 self
241 }
242
243 pub fn with_linger(mut self, linger: Duration) -> Self {
245 self.config = self.config.with_linger(linger);
246 self
247 }
248
249 pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
251 self.config = self.config.with_delivery_timeout(delivery_timeout);
252 self
253 }
254
255 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
257 self.config = self.config.with_request_timeout(request_timeout);
258 self
259 }
260
261 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
263 self.config = self.config.with_retry_backoff(retry_backoff);
264 self
265 }
266
267 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
269 self.config = self.config.with_max_retries(max_retries);
270 self
271 }
272
273 pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
275 self.config = self
276 .config
277 .with_max_in_flight_requests_per_connection(max_in_flight);
278 self
279 }
280
281 pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
283 self.config = self.config.with_transactional_id(transactional_id);
284 self
285 }
286
287 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
289 self.config = self.config.with_tcp_connector(tcp_connector);
290 self
291 }
292
293 pub async fn connect(self) -> Result<KafkaProducer> {
295 let producer = KafkaProducer::connect(self.config).await?;
296 Ok(producer.with_defaults(self.default_topic, self.default_partition))
297 }
298}
299
300#[derive(Debug, Clone)]
301pub struct AdminBuilder {
303 config: AdminConfig,
304}
305
306impl AdminBuilder {
307 fn new(bootstrap_server: impl Into<String>) -> Self {
308 Self {
309 config: AdminConfig::new(bootstrap_server),
310 }
311 }
312
313 fn from_servers(servers: Vec<String>) -> Self {
314 let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
315 builder.config = builder.config.with_bootstrap_servers(servers);
316 builder
317 }
318
319 pub fn with_bootstrap_servers(
321 mut self,
322 servers: impl IntoIterator<Item = impl Into<String>>,
323 ) -> Self {
324 self.config = self.config.with_bootstrap_servers(servers);
325 self
326 }
327
328 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
330 self.config = self.config.with_client_id(client_id);
331 self
332 }
333
334 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
336 self.config = self.config.with_request_timeout(request_timeout);
337 self
338 }
339
340 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
342 self.config = self.config.with_security_protocol(security_protocol);
343 self
344 }
345
346 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
348 self.config = self.config.with_tls(tls);
349 self
350 }
351
352 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
354 self.config = self.config.with_sasl(sasl);
355 self
356 }
357
358 pub fn with_sasl_plain(
360 mut self,
361 username: impl Into<String>,
362 password: impl Into<String>,
363 ) -> Self {
364 self.config = self.config.with_sasl_plain(username, password);
365 self
366 }
367
368 pub fn with_sasl_scram_sha_256(
370 mut self,
371 username: impl Into<String>,
372 password: impl Into<String>,
373 ) -> Self {
374 self.config = self.config.with_sasl_scram_sha_256(username, password);
375 self
376 }
377
378 pub fn with_sasl_scram_sha_512(
380 mut self,
381 username: impl Into<String>,
382 password: impl Into<String>,
383 ) -> Self {
384 self.config = self.config.with_sasl_scram_sha_512(username, password);
385 self
386 }
387
388 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
390 self.config = self.config.with_tcp_connector(tcp_connector);
391 self
392 }
393
394 pub async fn connect(self) -> Result<KafkaAdmin> {
396 KafkaAdmin::connect(self.config).await
397 }
398}
399
400#[derive(Debug, Clone)]
401pub struct ConsumerBuilder {
403 config: ConsumerConfig,
404 topics: Vec<String>,
405 poll_timeout: Duration,
406}
407
408impl ConsumerBuilder {
409 fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
410 Self {
411 config: ConsumerConfig::new(bootstrap_server, group_id),
412 topics: Vec::new(),
413 poll_timeout: DEFAULT_POLL_TIMEOUT,
414 }
415 }
416
417 fn from_servers(servers: Vec<String>, group_id: impl Into<String>) -> Self {
418 let first = servers.first().cloned().unwrap_or_default();
419 let mut builder = Self::new(first, group_id);
420 builder.config = builder.config.with_bootstrap_servers(servers);
421 builder
422 }
423
424 pub fn with_bootstrap_servers(
426 mut self,
427 servers: impl IntoIterator<Item = impl Into<String>>,
428 ) -> Self {
429 self.config = self.config.with_bootstrap_servers(servers);
430 self
431 }
432
433 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
435 self.config = self.config.with_client_id(client_id);
436 self
437 }
438
439 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
441 self.config = self.config.with_security_protocol(security_protocol);
442 self
443 }
444
445 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
447 self.config = self.config.with_tls(tls);
448 self
449 }
450
451 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
453 self.config = self.config.with_sasl(sasl);
454 self
455 }
456
457 pub fn with_sasl_plain(
459 mut self,
460 username: impl Into<String>,
461 password: impl Into<String>,
462 ) -> Self {
463 self.config = self.config.with_sasl_plain(username, password);
464 self
465 }
466
467 pub fn with_sasl_scram_sha_256(
469 mut self,
470 username: impl Into<String>,
471 password: impl Into<String>,
472 ) -> Self {
473 self.config = self.config.with_sasl_scram_sha_256(username, password);
474 self
475 }
476
477 pub fn with_sasl_scram_sha_512(
479 mut self,
480 username: impl Into<String>,
481 password: impl Into<String>,
482 ) -> Self {
483 self.config = self.config.with_sasl_scram_sha_512(username, password);
484 self
485 }
486
487 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
489 self.topics.push(topic.into());
490 self
491 }
492
493 pub fn with_topics<I, S>(mut self, topics: I) -> Self
495 where
496 I: IntoIterator<Item = S>,
497 S: Into<String>,
498 {
499 self.topics.extend(topics.into_iter().map(Into::into));
500 self
501 }
502
503 pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
505 self.config = self.config.with_auto_offset_reset(auto_offset_reset);
506 self
507 }
508
509 pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
511 self.config = self.config.with_isolation_level(isolation_level);
512 self
513 }
514
515 pub fn with_auto_commit(mut self, enable_auto_commit: bool) -> Self {
517 self.config = self.config.with_enable_auto_commit(enable_auto_commit);
518 self
519 }
520
521 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
523 self.config = self.config.with_request_timeout(request_timeout);
524 self
525 }
526
527 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
529 self.config = self.config.with_retry_backoff(retry_backoff);
530 self
531 }
532
533 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
535 self.config = self.config.with_max_retries(max_retries);
536 self
537 }
538
539 pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
541 self.config = self.config.with_instance_id(instance_id);
542 self
543 }
544
545 pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
547 self.config = self.config.with_rebalance_listener(listener);
548 self
549 }
550
551 pub fn with_rebalance_callback(
553 mut self,
554 callback: impl Fn(ConsumerRebalanceEvent) + Send + Sync + 'static,
555 ) -> Self {
556 self.config = self.config.with_rebalance_callback(callback);
557 self
558 }
559
560 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
562 self.config = self.config.with_tcp_connector(tcp_connector);
563 self
564 }
565
566 pub fn with_poll_timeout(mut self, poll_timeout: Duration) -> Self {
568 self.poll_timeout = poll_timeout;
569 self
570 }
571
572 pub async fn connect(self) -> Result<KafkaConsumer> {
574 let topics = self
575 .topics
576 .into_iter()
577 .map(validate_topic_name)
578 .collect::<Result<Vec<_>>>()?;
579 let consumer = KafkaConsumer::connect(self.config).await?;
580 if !topics.is_empty()
581 && let Err(error) = consumer.subscribe(topics).await
582 {
583 let _ = consumer.shutdown().await;
584 return Err(error);
585 }
586 Ok(consumer.with_default_poll_timeout(self.poll_timeout))
587 }
588}
589
590fn validate_topic_name(topic: String) -> Result<String> {
591 let topic = topic.trim();
592 if topic.is_empty() {
593 return Err(anyhow!("topic must be non-empty").into());
594 }
595 Ok(topic.to_owned())
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601 use crate::SaslMechanism;
602
603 #[test]
604 fn client_builders_preserve_bootstrap_servers_and_topic_defaults() {
605 let client =
606 KafkaClient::new("host-a:9092").with_bootstrap_servers(["host-a:9092", "host-b:9092"]);
607
608 let producer = client
609 .topic("orders")
610 .producer()
611 .with_client_id("producer-a")
612 .with_default_partition(2)
613 .with_compression(ProducerCompression::Lz4)
614 .with_batch_size(32)
615 .with_linger(Duration::from_millis(10))
616 .with_delivery_timeout(Duration::from_secs(3))
617 .with_transactional_id("tx-a");
618
619 assert_eq!(
620 producer.config.bootstrap_servers,
621 vec!["host-a:9092", "host-b:9092"]
622 );
623 assert_eq!(producer.config.client_id, "producer-a");
624 assert_eq!(producer.config.compression, ProducerCompression::Lz4);
625 assert_eq!(producer.config.batch_size, 32);
626 assert_eq!(producer.config.linger, Duration::from_millis(10));
627 assert_eq!(producer.config.delivery_timeout, Duration::from_secs(3));
628 assert_eq!(producer.config.transactional_id.as_deref(), Some("tx-a"));
629 assert_eq!(producer.default_topic.as_deref(), Some("orders"));
630 assert_eq!(producer.default_partition, Some(2));
631 }
632
633 #[test]
634 fn admin_builder_forwards_security_and_timeout_options() {
635 let builder = KafkaClient::new("host-a:9092")
636 .admin()
637 .with_bootstrap_servers(["host-b:9092", "host-c:9092"])
638 .with_client_id("admin-a")
639 .with_request_timeout(Duration::from_secs(9))
640 .with_security_protocol(SecurityProtocol::Ssl)
641 .with_tls(TlsConfig::new().with_server_name("kafka.internal"))
642 .with_sasl_scram_sha_512("user-a", "secret-a");
643
644 assert_eq!(
645 builder.config.bootstrap_servers,
646 vec!["host-b:9092", "host-c:9092"]
647 );
648 assert_eq!(builder.config.client_id, "admin-a");
649 assert_eq!(builder.config.request_timeout, Duration::from_secs(9));
650 assert_eq!(builder.config.security_protocol, SecurityProtocol::SaslSsl);
651 assert_eq!(
652 builder.config.tls.server_name.as_deref(),
653 Some("kafka.internal")
654 );
655 assert_eq!(builder.config.sasl.mechanism, SaslMechanism::ScramSha512);
656 }
657
658 #[test]
659 fn consumer_builder_collects_topics_and_group_options() {
660 let builder = KafkaClient::new("host-a:9092")
661 .topic("orders")
662 .consumer("group-a")
663 .with_bootstrap_servers(["host-b:9092"])
664 .with_client_id("consumer-a")
665 .with_topic("payments")
666 .with_topics(["shipments", "invoices"])
667 .with_auto_offset_reset(AutoOffsetReset::Latest)
668 .with_isolation_level(IsolationLevel::ReadCommitted)
669 .with_auto_commit(true)
670 .with_instance_id("instance-a")
671 .with_poll_timeout(Duration::from_millis(250))
672 .with_sasl_plain("user-a", "secret-a");
673
674 assert_eq!(builder.config.bootstrap_servers, vec!["host-b:9092"]);
675 assert_eq!(builder.config.client_id, "consumer-a");
676 assert_eq!(builder.config.group_id, "group-a");
677 assert_eq!(
678 builder.topics,
679 vec!["orders", "payments", "shipments", "invoices"]
680 );
681 assert_eq!(builder.config.auto_offset_reset, AutoOffsetReset::Latest);
682 assert_eq!(
683 builder.config.isolation_level,
684 IsolationLevel::ReadCommitted
685 );
686 assert!(builder.config.enable_auto_commit);
687 assert_eq!(builder.config.instance_id.as_deref(), Some("instance-a"));
688 assert_eq!(builder.poll_timeout, Duration::from_millis(250));
689 assert_eq!(
690 builder.config.security_protocol,
691 SecurityProtocol::SaslPlaintext
692 );
693 }
694
695 #[test]
696 fn topics_facade_collects_multi_topic_consumer_defaults() {
697 let builder = KafkaClient::new("host-a:9092")
698 .with_bootstrap_servers(["host-a:9092", "host-b:9092"])
699 .topics(["orders", "payments"])
700 .consumer("group-a")
701 .with_topic("shipments");
702
703 assert_eq!(
704 builder.config.bootstrap_servers,
705 vec!["host-a:9092", "host-b:9092"]
706 );
707 assert_eq!(builder.config.group_id, "group-a");
708 assert_eq!(builder.topics, vec!["orders", "payments", "shipments"]);
709 }
710
711 #[test]
712 fn validate_topic_name_trims_and_rejects_empty_names() {
713 assert_eq!(
714 validate_topic_name(" topic-a ".to_owned()).unwrap(),
715 "topic-a"
716 );
717 assert!(validate_topic_name(" ".to_owned()).is_err());
718 }
719}