1use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19
20use crate::admin::KafkaAdmin;
21use crate::consumer::KafkaConsumer;
22use crate::network::TcpConnector;
23use crate::producer::KafkaProducer;
24use crate::{
25 AdminConfig, AutoOffsetReset, ConsumerConfig, IsolationLevel, ProducerCompression,
26 ProducerConfig, Result, SaslConfig, SecurityProtocol, TlsConfig,
27};
28use crate::{ConsumerRebalanceEvent, ConsumerRebalanceListener};
29
30const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
31
32#[derive(Debug, Clone)]
33pub struct KafkaClient {
35 bootstrap_servers: Vec<String>,
36}
37
38impl KafkaClient {
39 pub fn new(bootstrap_server: impl Into<String>) -> Self {
41 Self {
42 bootstrap_servers: vec![bootstrap_server.into()],
43 }
44 }
45
46 pub fn with_bootstrap_servers(
48 mut self,
49 servers: impl IntoIterator<Item = impl Into<String>>,
50 ) -> Self {
51 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
52 self
53 }
54
55 pub fn topic(&self, topic: impl Into<String>) -> KafkaTopic {
57 KafkaTopic {
58 bootstrap_servers: self.bootstrap_servers.clone(),
59 topic: topic.into(),
60 }
61 }
62
63 pub fn producer(&self) -> ProducerBuilder {
65 ProducerBuilder::from_servers(self.bootstrap_servers.clone())
66 }
67
68 pub fn admin(&self) -> AdminBuilder {
70 AdminBuilder::from_servers(self.bootstrap_servers.clone())
71 }
72
73 pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
75 ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
76 }
77}
78
79#[derive(Debug, Clone)]
80pub struct KafkaTopic {
82 bootstrap_servers: Vec<String>,
83 topic: String,
84}
85
86impl KafkaTopic {
87 pub fn producer(&self) -> ProducerBuilder {
89 ProducerBuilder::from_servers(self.bootstrap_servers.clone())
90 .with_default_topic(self.topic.clone())
91 }
92
93 pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
95 ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
96 .with_topic(self.topic.clone())
97 }
98}
99
100#[derive(Debug, Clone)]
101pub struct ProducerBuilder {
103 config: ProducerConfig,
104 default_topic: Option<String>,
105 default_partition: Option<i32>,
106}
107
108impl ProducerBuilder {
109 fn new(bootstrap_server: impl Into<String>) -> Self {
110 Self {
111 config: ProducerConfig::new(bootstrap_server),
112 default_topic: None,
113 default_partition: None,
114 }
115 }
116
117 fn from_servers(servers: Vec<String>) -> Self {
118 let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
119 builder.config = builder.config.with_bootstrap_servers(servers);
120 builder
121 }
122
123 pub fn with_bootstrap_servers(
125 mut self,
126 servers: impl IntoIterator<Item = impl Into<String>>,
127 ) -> Self {
128 self.config = self.config.with_bootstrap_servers(servers);
129 self
130 }
131
132 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
134 self.config = self.config.with_client_id(client_id);
135 self
136 }
137
138 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
140 self.config = self.config.with_security_protocol(security_protocol);
141 self
142 }
143
144 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
146 self.config = self.config.with_tls(tls);
147 self
148 }
149
150 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
152 self.config = self.config.with_sasl(sasl);
153 self
154 }
155
156 pub fn with_sasl_plain(
158 mut self,
159 username: impl Into<String>,
160 password: impl Into<String>,
161 ) -> Self {
162 self.config = self.config.with_sasl_plain(username, password);
163 self
164 }
165
166 pub fn with_sasl_scram_sha_256(
168 mut self,
169 username: impl Into<String>,
170 password: impl Into<String>,
171 ) -> Self {
172 self.config = self.config.with_sasl_scram_sha_256(username, password);
173 self
174 }
175
176 pub fn with_sasl_scram_sha_512(
178 mut self,
179 username: impl Into<String>,
180 password: impl Into<String>,
181 ) -> Self {
182 self.config = self.config.with_sasl_scram_sha_512(username, password);
183 self
184 }
185
186 pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
188 self.config = self.config.with_compression(compression);
189 self
190 }
191
192 pub fn with_default_topic(mut self, topic: impl Into<String>) -> Self {
194 self.default_topic = Some(topic.into());
195 self
196 }
197
198 pub fn with_default_partition(mut self, partition: i32) -> Self {
200 self.default_partition = Some(partition);
201 self
202 }
203
204 pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
206 self.config = self.config.with_enable_idempotence(enable_idempotence);
207 self
208 }
209
210 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
212 self.config = self.config.with_batch_size(batch_size);
213 self
214 }
215
216 pub fn with_linger(mut self, linger: Duration) -> Self {
218 self.config = self.config.with_linger(linger);
219 self
220 }
221
222 pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
224 self.config = self.config.with_delivery_timeout(delivery_timeout);
225 self
226 }
227
228 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
230 self.config = self.config.with_request_timeout(request_timeout);
231 self
232 }
233
234 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
236 self.config = self.config.with_retry_backoff(retry_backoff);
237 self
238 }
239
240 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
242 self.config = self.config.with_max_retries(max_retries);
243 self
244 }
245
246 pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
248 self.config = self
249 .config
250 .with_max_in_flight_requests_per_connection(max_in_flight);
251 self
252 }
253
254 pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
256 self.config = self.config.with_transactional_id(transactional_id);
257 self
258 }
259
260 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
262 self.config = self.config.with_tcp_connector(tcp_connector);
263 self
264 }
265
266 pub async fn connect(self) -> Result<KafkaProducer> {
268 let producer = KafkaProducer::connect(self.config).await?;
269 Ok(producer.with_defaults(self.default_topic, self.default_partition))
270 }
271}
272
273#[derive(Debug, Clone)]
274pub struct AdminBuilder {
276 config: AdminConfig,
277}
278
279impl AdminBuilder {
280 fn new(bootstrap_server: impl Into<String>) -> Self {
281 Self {
282 config: AdminConfig::new(bootstrap_server),
283 }
284 }
285
286 fn from_servers(servers: Vec<String>) -> Self {
287 let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
288 builder.config = builder.config.with_bootstrap_servers(servers);
289 builder
290 }
291
292 pub fn with_bootstrap_servers(
294 mut self,
295 servers: impl IntoIterator<Item = impl Into<String>>,
296 ) -> Self {
297 self.config = self.config.with_bootstrap_servers(servers);
298 self
299 }
300
301 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
303 self.config = self.config.with_client_id(client_id);
304 self
305 }
306
307 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
309 self.config = self.config.with_request_timeout(request_timeout);
310 self
311 }
312
313 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
315 self.config = self.config.with_security_protocol(security_protocol);
316 self
317 }
318
319 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
321 self.config = self.config.with_tls(tls);
322 self
323 }
324
325 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
327 self.config = self.config.with_sasl(sasl);
328 self
329 }
330
331 pub fn with_sasl_plain(
333 mut self,
334 username: impl Into<String>,
335 password: impl Into<String>,
336 ) -> Self {
337 self.config = self.config.with_sasl_plain(username, password);
338 self
339 }
340
341 pub fn with_sasl_scram_sha_256(
343 mut self,
344 username: impl Into<String>,
345 password: impl Into<String>,
346 ) -> Self {
347 self.config = self.config.with_sasl_scram_sha_256(username, password);
348 self
349 }
350
351 pub fn with_sasl_scram_sha_512(
353 mut self,
354 username: impl Into<String>,
355 password: impl Into<String>,
356 ) -> Self {
357 self.config = self.config.with_sasl_scram_sha_512(username, password);
358 self
359 }
360
361 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
363 self.config = self.config.with_tcp_connector(tcp_connector);
364 self
365 }
366
367 pub async fn connect(self) -> Result<KafkaAdmin> {
369 KafkaAdmin::connect(self.config).await
370 }
371}
372
373#[derive(Debug, Clone)]
374pub struct ConsumerBuilder {
376 config: ConsumerConfig,
377 topics: Vec<String>,
378 poll_timeout: Duration,
379}
380
381impl ConsumerBuilder {
382 fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
383 Self {
384 config: ConsumerConfig::new(bootstrap_server, group_id),
385 topics: Vec::new(),
386 poll_timeout: DEFAULT_POLL_TIMEOUT,
387 }
388 }
389
390 fn from_servers(servers: Vec<String>, group_id: impl Into<String>) -> Self {
391 let first = servers.first().cloned().unwrap_or_default();
392 let mut builder = Self::new(first, group_id);
393 builder.config = builder.config.with_bootstrap_servers(servers);
394 builder
395 }
396
397 pub fn with_bootstrap_servers(
399 mut self,
400 servers: impl IntoIterator<Item = impl Into<String>>,
401 ) -> Self {
402 self.config = self.config.with_bootstrap_servers(servers);
403 self
404 }
405
406 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
408 self.config = self.config.with_client_id(client_id);
409 self
410 }
411
412 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
414 self.config = self.config.with_security_protocol(security_protocol);
415 self
416 }
417
418 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
420 self.config = self.config.with_tls(tls);
421 self
422 }
423
424 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
426 self.config = self.config.with_sasl(sasl);
427 self
428 }
429
430 pub fn with_sasl_plain(
432 mut self,
433 username: impl Into<String>,
434 password: impl Into<String>,
435 ) -> Self {
436 self.config = self.config.with_sasl_plain(username, password);
437 self
438 }
439
440 pub fn with_sasl_scram_sha_256(
442 mut self,
443 username: impl Into<String>,
444 password: impl Into<String>,
445 ) -> Self {
446 self.config = self.config.with_sasl_scram_sha_256(username, password);
447 self
448 }
449
450 pub fn with_sasl_scram_sha_512(
452 mut self,
453 username: impl Into<String>,
454 password: impl Into<String>,
455 ) -> Self {
456 self.config = self.config.with_sasl_scram_sha_512(username, password);
457 self
458 }
459
460 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
462 self.topics.push(topic.into());
463 self
464 }
465
466 pub fn with_topics<I, S>(mut self, topics: I) -> Self
468 where
469 I: IntoIterator<Item = S>,
470 S: Into<String>,
471 {
472 self.topics.extend(topics.into_iter().map(Into::into));
473 self
474 }
475
476 pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
478 self.config = self.config.with_auto_offset_reset(auto_offset_reset);
479 self
480 }
481
482 pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
484 self.config = self.config.with_isolation_level(isolation_level);
485 self
486 }
487
488 pub fn with_auto_commit(mut self, enable_auto_commit: bool) -> Self {
490 self.config = self.config.with_enable_auto_commit(enable_auto_commit);
491 self
492 }
493
494 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
496 self.config = self.config.with_request_timeout(request_timeout);
497 self
498 }
499
500 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
502 self.config = self.config.with_retry_backoff(retry_backoff);
503 self
504 }
505
506 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
508 self.config = self.config.with_max_retries(max_retries);
509 self
510 }
511
512 pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
514 self.config = self.config.with_instance_id(instance_id);
515 self
516 }
517
518 pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
520 self.config = self.config.with_rebalance_listener(listener);
521 self
522 }
523
524 pub fn with_rebalance_callback(
526 mut self,
527 callback: impl Fn(ConsumerRebalanceEvent) + Send + Sync + 'static,
528 ) -> Self {
529 self.config = self.config.with_rebalance_callback(callback);
530 self
531 }
532
533 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
535 self.config = self.config.with_tcp_connector(tcp_connector);
536 self
537 }
538
539 pub fn with_poll_timeout(mut self, poll_timeout: Duration) -> Self {
541 self.poll_timeout = poll_timeout;
542 self
543 }
544
545 pub async fn connect(self) -> Result<KafkaConsumer> {
547 let topics = self
548 .topics
549 .into_iter()
550 .map(validate_topic_name)
551 .collect::<Result<Vec<_>>>()?;
552 let consumer = KafkaConsumer::connect(self.config).await?;
553 if !topics.is_empty()
554 && let Err(error) = consumer.subscribe(topics).await
555 {
556 let _ = consumer.shutdown().await;
557 return Err(error);
558 }
559 Ok(consumer.with_default_poll_timeout(self.poll_timeout))
560 }
561}
562
563fn validate_topic_name(topic: String) -> Result<String> {
564 let topic = topic.trim();
565 if topic.is_empty() {
566 return Err(anyhow!("topic must be non-empty").into());
567 }
568 Ok(topic.to_owned())
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use crate::SaslMechanism;
575
576 #[test]
577 fn client_builders_preserve_bootstrap_servers_and_topic_defaults() {
578 let client =
579 KafkaClient::new("host-a:9092").with_bootstrap_servers(["host-a:9092", "host-b:9092"]);
580
581 let producer = client
582 .topic("orders")
583 .producer()
584 .with_client_id("producer-a")
585 .with_default_partition(2)
586 .with_compression(ProducerCompression::Lz4)
587 .with_batch_size(32)
588 .with_linger(Duration::from_millis(10))
589 .with_delivery_timeout(Duration::from_secs(3))
590 .with_transactional_id("tx-a");
591
592 assert_eq!(
593 producer.config.bootstrap_servers,
594 vec!["host-a:9092", "host-b:9092"]
595 );
596 assert_eq!(producer.config.client_id, "producer-a");
597 assert_eq!(producer.config.compression, ProducerCompression::Lz4);
598 assert_eq!(producer.config.batch_size, 32);
599 assert_eq!(producer.config.linger, Duration::from_millis(10));
600 assert_eq!(producer.config.delivery_timeout, Duration::from_secs(3));
601 assert_eq!(producer.config.transactional_id.as_deref(), Some("tx-a"));
602 assert_eq!(producer.default_topic.as_deref(), Some("orders"));
603 assert_eq!(producer.default_partition, Some(2));
604 }
605
606 #[test]
607 fn admin_builder_forwards_security_and_timeout_options() {
608 let builder = KafkaClient::new("host-a:9092")
609 .admin()
610 .with_bootstrap_servers(["host-b:9092", "host-c:9092"])
611 .with_client_id("admin-a")
612 .with_request_timeout(Duration::from_secs(9))
613 .with_security_protocol(SecurityProtocol::Ssl)
614 .with_tls(TlsConfig::new().with_server_name("kafka.internal"))
615 .with_sasl_scram_sha_512("user-a", "secret-a");
616
617 assert_eq!(
618 builder.config.bootstrap_servers,
619 vec!["host-b:9092", "host-c:9092"]
620 );
621 assert_eq!(builder.config.client_id, "admin-a");
622 assert_eq!(builder.config.request_timeout, Duration::from_secs(9));
623 assert_eq!(builder.config.security_protocol, SecurityProtocol::SaslSsl);
624 assert_eq!(
625 builder.config.tls.server_name.as_deref(),
626 Some("kafka.internal")
627 );
628 assert_eq!(builder.config.sasl.mechanism, SaslMechanism::ScramSha512);
629 }
630
631 #[test]
632 fn consumer_builder_collects_topics_and_group_options() {
633 let builder = KafkaClient::new("host-a:9092")
634 .topic("orders")
635 .consumer("group-a")
636 .with_bootstrap_servers(["host-b:9092"])
637 .with_client_id("consumer-a")
638 .with_topic("payments")
639 .with_topics(["shipments", "invoices"])
640 .with_auto_offset_reset(AutoOffsetReset::Latest)
641 .with_isolation_level(IsolationLevel::ReadCommitted)
642 .with_auto_commit(true)
643 .with_instance_id("instance-a")
644 .with_poll_timeout(Duration::from_millis(250))
645 .with_sasl_plain("user-a", "secret-a");
646
647 assert_eq!(builder.config.bootstrap_servers, vec!["host-b:9092"]);
648 assert_eq!(builder.config.client_id, "consumer-a");
649 assert_eq!(builder.config.group_id, "group-a");
650 assert_eq!(
651 builder.topics,
652 vec!["orders", "payments", "shipments", "invoices"]
653 );
654 assert_eq!(builder.config.auto_offset_reset, AutoOffsetReset::Latest);
655 assert_eq!(
656 builder.config.isolation_level,
657 IsolationLevel::ReadCommitted
658 );
659 assert!(builder.config.enable_auto_commit);
660 assert_eq!(builder.config.instance_id.as_deref(), Some("instance-a"));
661 assert_eq!(builder.poll_timeout, Duration::from_millis(250));
662 assert_eq!(
663 builder.config.security_protocol,
664 SecurityProtocol::SaslPlaintext
665 );
666 }
667
668 #[test]
669 fn validate_topic_name_trims_and_rejects_empty_names() {
670 assert_eq!(
671 validate_topic_name(" topic-a ".to_owned()).unwrap(),
672 "topic-a"
673 );
674 assert!(validate_topic_name(" ".to_owned()).is_err());
675 }
676}