1use std::fmt;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::time::Duration;
21
22use kafka_protocol::records::Compression;
23
24use crate::constants::{READ_COMMITTED, READ_UNCOMMITTED};
25use crate::network::{TcpConnector, TokioTcpConnector};
26use crate::types::ConsumerRebalanceListener;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum SecurityProtocol {
31 Plaintext,
33 Ssl,
35 SaslPlaintext,
37 SaslSsl,
39}
40
41impl SecurityProtocol {
42 pub fn uses_tls(self) -> bool {
44 matches!(self, Self::Ssl | Self::SaslSsl)
45 }
46
47 pub fn uses_sasl(self) -> bool {
49 matches!(self, Self::SaslPlaintext | Self::SaslSsl)
50 }
51
52 fn with_sasl(self) -> Self {
53 if self.uses_tls() {
54 Self::SaslSsl
55 } else {
56 Self::SaslPlaintext
57 }
58 }
59}
60
61#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
62pub enum SaslMechanism {
64 #[default]
65 Plain,
67 ScramSha256,
69 ScramSha512,
71}
72
73impl SaslMechanism {
74 pub fn as_str(self) -> &'static str {
76 match self {
77 Self::Plain => "PLAIN",
78 Self::ScramSha256 => "SCRAM-SHA-256",
79 Self::ScramSha512 => "SCRAM-SHA-512",
80 }
81 }
82
83 pub fn is_scram(self) -> bool {
85 matches!(self, Self::ScramSha256 | Self::ScramSha512)
86 }
87
88 pub fn scram_type(self) -> Option<i8> {
90 match self {
91 Self::Plain => None,
92 Self::ScramSha256 => Some(1),
93 Self::ScramSha512 => Some(2),
94 }
95 }
96}
97
98#[derive(Clone, Default, PartialEq, Eq)]
99pub struct SaslConfig {
101 pub mechanism: SaslMechanism,
103 pub username: Option<String>,
105 pub password: Option<String>,
107 pub authorization_id: Option<String>,
109}
110
111impl fmt::Debug for SaslConfig {
112 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
113 formatter
114 .debug_struct("SaslConfig")
115 .field("mechanism", &self.mechanism)
116 .field("username", &self.username)
117 .field("password", &self.password.as_ref().map(|_| "<redacted>"))
118 .field("authorization_id", &self.authorization_id)
119 .finish()
120 }
121}
122
123impl SaslConfig {
124 pub fn plain(username: impl Into<String>, password: impl Into<String>) -> Self {
126 Self {
127 mechanism: SaslMechanism::Plain,
128 username: Some(username.into()),
129 password: Some(password.into()),
130 authorization_id: None,
131 }
132 }
133
134 pub fn scram_sha_256(username: impl Into<String>, password: impl Into<String>) -> Self {
136 Self {
137 mechanism: SaslMechanism::ScramSha256,
138 username: Some(username.into()),
139 password: Some(password.into()),
140 authorization_id: None,
141 }
142 }
143
144 pub fn scram_sha_512(username: impl Into<String>, password: impl Into<String>) -> Self {
146 Self {
147 mechanism: SaslMechanism::ScramSha512,
148 username: Some(username.into()),
149 password: Some(password.into()),
150 authorization_id: None,
151 }
152 }
153
154 pub fn with_authorization_id(mut self, authorization_id: impl Into<String>) -> Self {
156 self.authorization_id = Some(authorization_id.into());
157 self
158 }
159}
160
161#[derive(Debug, Clone, Default, PartialEq, Eq)]
162pub struct TlsConfig {
164 pub ca_cert_path: Option<PathBuf>,
166 pub client_cert_path: Option<PathBuf>,
168 pub client_key_path: Option<PathBuf>,
170 pub server_name: Option<String>,
172}
173
174impl TlsConfig {
175 pub fn new() -> Self {
177 Self::default()
178 }
179
180 pub fn with_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
182 self.ca_cert_path = Some(path.into());
183 self
184 }
185
186 pub fn with_client_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
188 self.client_cert_path = Some(path.into());
189 self
190 }
191
192 pub fn with_client_key_path(mut self, path: impl Into<PathBuf>) -> Self {
194 self.client_key_path = Some(path.into());
195 self
196 }
197
198 pub fn with_server_name(mut self, server_name: impl Into<String>) -> Self {
200 self.server_name = Some(server_name.into());
201 self
202 }
203}
204
205#[derive(Debug, Clone, Copy, PartialEq, Eq)]
206pub enum ProducerPartitioner {
208 Default,
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq)]
213pub enum ProducerCompression {
215 None,
217 Gzip,
219 Snappy,
221 Lz4,
223 Zstd,
225}
226
227impl From<ProducerCompression> for Compression {
228 fn from(value: ProducerCompression) -> Self {
229 match value {
230 ProducerCompression::None => Compression::None,
231 ProducerCompression::Gzip => Compression::Gzip,
232 ProducerCompression::Snappy => Compression::Snappy,
233 ProducerCompression::Lz4 => Compression::Lz4,
234 ProducerCompression::Zstd => Compression::Zstd,
235 }
236 }
237}
238
239#[derive(Debug, Clone)]
240pub struct ProducerConfig {
242 pub bootstrap_servers: Vec<String>,
244 pub client_id: String,
246 pub security_protocol: SecurityProtocol,
248 pub tls: TlsConfig,
250 pub sasl: SaslConfig,
252 pub acks: i16,
254 pub enable_idempotence: bool,
256 pub partitioner: ProducerPartitioner,
258 pub compression: ProducerCompression,
260 pub batch_size: usize,
262 pub buffer_memory: usize,
264 pub max_block: Duration,
266 pub max_request_size: usize,
268 pub partitioner_ignore_keys: bool,
270 pub linger: Duration,
272 pub delivery_timeout: Duration,
274 pub request_timeout: Duration,
276 pub metadata_max_age: Duration,
278 pub retry_backoff: Duration,
280 pub max_retries: usize,
282 pub max_in_flight_requests_per_connection: usize,
284 pub transactional_id: Option<String>,
286 pub transaction_timeout: Duration,
288 pub tcp_connector: Arc<dyn TcpConnector>,
290}
291
292impl ProducerConfig {
293 pub fn new(bootstrap_server: impl Into<String>) -> Self {
295 Self {
296 bootstrap_servers: vec![bootstrap_server.into()],
297 client_id: "rust-producer".to_owned(),
298 security_protocol: SecurityProtocol::Plaintext,
299 tls: TlsConfig::default(),
300 sasl: SaslConfig::default(),
301 acks: 1,
302 enable_idempotence: false,
303 partitioner: ProducerPartitioner::Default,
304 compression: ProducerCompression::None,
305 batch_size: 16 * 1024,
306 buffer_memory: 32 * 1024 * 1024,
307 max_block: Duration::from_secs(60),
308 max_request_size: 1024 * 1024,
309 partitioner_ignore_keys: false,
310 linger: Duration::ZERO,
311 delivery_timeout: Duration::from_secs(120),
312 request_timeout: Duration::from_secs(5),
313 metadata_max_age: Duration::from_secs(30),
314 retry_backoff: Duration::from_millis(250),
315 max_retries: 3,
316 max_in_flight_requests_per_connection: 5,
317 transactional_id: None,
318 transaction_timeout: Duration::from_secs(30),
319 tcp_connector: Arc::new(TokioTcpConnector),
320 }
321 }
322
323 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
325 self.client_id = client_id.into();
326 self
327 }
328
329 pub fn with_bootstrap_servers(
331 mut self,
332 servers: impl IntoIterator<Item = impl Into<String>>,
333 ) -> Self {
334 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
335 self
336 }
337
338 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
340 self.security_protocol = security_protocol;
341 self
342 }
343
344 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
346 self.security_protocol = if self.security_protocol.uses_sasl() {
347 SecurityProtocol::SaslSsl
348 } else {
349 SecurityProtocol::Ssl
350 };
351 self.tls = tls;
352 self
353 }
354
355 pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
357 self.security_protocol = if self.security_protocol.uses_sasl() {
358 SecurityProtocol::SaslSsl
359 } else {
360 SecurityProtocol::Ssl
361 };
362 self.tls = self.tls.with_ca_cert_path(path);
363 self
364 }
365
366 pub fn with_tls_client_auth_paths(
368 mut self,
369 cert_path: impl Into<PathBuf>,
370 key_path: impl Into<PathBuf>,
371 ) -> Self {
372 self.security_protocol = if self.security_protocol.uses_sasl() {
373 SecurityProtocol::SaslSsl
374 } else {
375 SecurityProtocol::Ssl
376 };
377 self.tls = self
378 .tls
379 .with_client_cert_path(cert_path)
380 .with_client_key_path(key_path);
381 self
382 }
383
384 pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
386 self.security_protocol = if self.security_protocol.uses_sasl() {
387 SecurityProtocol::SaslSsl
388 } else {
389 SecurityProtocol::Ssl
390 };
391 self.tls = self.tls.with_server_name(server_name);
392 self
393 }
394
395 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
397 self.security_protocol = self.security_protocol.with_sasl();
398 self.sasl = sasl;
399 self
400 }
401
402 pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
404 self.with_sasl(SaslConfig::plain(username, password))
405 }
406
407 pub fn with_sasl_scram_sha_256(
409 self,
410 username: impl Into<String>,
411 password: impl Into<String>,
412 ) -> Self {
413 self.with_sasl(SaslConfig::scram_sha_256(username, password))
414 }
415
416 pub fn with_sasl_scram_sha_512(
418 self,
419 username: impl Into<String>,
420 password: impl Into<String>,
421 ) -> Self {
422 self.with_sasl(SaslConfig::scram_sha_512(username, password))
423 }
424
425 pub fn with_acks(mut self, acks: i16) -> Self {
427 self.acks = acks;
428 self
429 }
430
431 pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
433 self.enable_idempotence = enable_idempotence;
434 if enable_idempotence {
435 self.acks = -1;
436 self.max_retries = self.max_retries.max(1);
437 }
438 self
439 }
440
441 pub fn with_partitioner(mut self, partitioner: ProducerPartitioner) -> Self {
443 self.partitioner = partitioner;
444 self
445 }
446
447 pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
449 self.compression = compression;
450 self
451 }
452
453 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
455 self.batch_size = batch_size.max(1);
456 self
457 }
458
459 pub fn with_buffer_memory(mut self, buffer_memory: usize) -> Self {
461 self.buffer_memory = buffer_memory;
462 self
463 }
464
465 pub fn with_max_block(mut self, max_block: Duration) -> Self {
467 self.max_block = max_block;
468 self
469 }
470
471 pub fn with_max_request_size(mut self, max_request_size: usize) -> Self {
473 self.max_request_size = max_request_size;
474 self
475 }
476
477 pub fn with_partitioner_ignore_keys(mut self, ignore_keys: bool) -> Self {
479 self.partitioner_ignore_keys = ignore_keys;
480 self
481 }
482
483 pub fn with_linger(mut self, linger: Duration) -> Self {
485 self.linger = linger;
486 self
487 }
488
489 pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
491 self.delivery_timeout = delivery_timeout;
492 self
493 }
494
495 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
497 self.request_timeout = request_timeout;
498 self
499 }
500
501 pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
503 self.metadata_max_age = metadata_max_age;
504 self
505 }
506
507 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
509 self.retry_backoff = retry_backoff;
510 self
511 }
512
513 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
515 self.max_retries = max_retries;
516 self
517 }
518
519 pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
521 self.max_in_flight_requests_per_connection = max_in_flight.max(1);
522 self
523 }
524
525 pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
527 self.transactional_id = Some(transactional_id.into());
528 self.acks = -1;
529 self.enable_idempotence = true;
530 self
531 }
532
533 pub fn with_transaction_timeout(mut self, transaction_timeout: Duration) -> Self {
535 self.transaction_timeout = transaction_timeout;
536 self
537 }
538
539 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
541 self.tcp_connector = tcp_connector;
542 self
543 }
544
545 pub fn is_transactional(&self) -> bool {
547 self.transactional_id.is_some()
548 }
549
550 pub fn is_idempotent(&self) -> bool {
552 self.enable_idempotence
553 }
554}
555
556pub type ClientConfig = ProducerConfig;
558
559#[derive(Debug, Clone)]
560pub struct AdminConfig {
562 pub bootstrap_servers: Vec<String>,
564 pub client_id: String,
566 pub security_protocol: SecurityProtocol,
568 pub tls: TlsConfig,
570 pub sasl: SaslConfig,
572 pub request_timeout: Duration,
574 pub tcp_connector: Arc<dyn TcpConnector>,
576}
577
578impl AdminConfig {
579 pub fn new(bootstrap_server: impl Into<String>) -> Self {
581 Self {
582 bootstrap_servers: vec![bootstrap_server.into()],
583 client_id: "rust-admin".to_owned(),
584 security_protocol: SecurityProtocol::Plaintext,
585 tls: TlsConfig::default(),
586 sasl: SaslConfig::default(),
587 request_timeout: Duration::from_secs(5),
588 tcp_connector: Arc::new(TokioTcpConnector),
589 }
590 }
591
592 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
594 self.client_id = client_id.into();
595 self
596 }
597
598 pub fn with_bootstrap_servers(
600 mut self,
601 servers: impl IntoIterator<Item = impl Into<String>>,
602 ) -> Self {
603 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
604 self
605 }
606
607 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
609 self.security_protocol = security_protocol;
610 self
611 }
612
613 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
615 self.security_protocol = if self.security_protocol.uses_sasl() {
616 SecurityProtocol::SaslSsl
617 } else {
618 SecurityProtocol::Ssl
619 };
620 self.tls = tls;
621 self
622 }
623
624 pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
626 self.security_protocol = if self.security_protocol.uses_sasl() {
627 SecurityProtocol::SaslSsl
628 } else {
629 SecurityProtocol::Ssl
630 };
631 self.tls = self.tls.with_ca_cert_path(path);
632 self
633 }
634
635 pub fn with_tls_client_auth_paths(
637 mut self,
638 cert_path: impl Into<PathBuf>,
639 key_path: impl Into<PathBuf>,
640 ) -> Self {
641 self.security_protocol = if self.security_protocol.uses_sasl() {
642 SecurityProtocol::SaslSsl
643 } else {
644 SecurityProtocol::Ssl
645 };
646 self.tls = self
647 .tls
648 .with_client_cert_path(cert_path)
649 .with_client_key_path(key_path);
650 self
651 }
652
653 pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
655 self.security_protocol = if self.security_protocol.uses_sasl() {
656 SecurityProtocol::SaslSsl
657 } else {
658 SecurityProtocol::Ssl
659 };
660 self.tls = self.tls.with_server_name(server_name);
661 self
662 }
663
664 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
666 self.security_protocol = self.security_protocol.with_sasl();
667 self.sasl = sasl;
668 self
669 }
670
671 pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
673 self.with_sasl(SaslConfig::plain(username, password))
674 }
675
676 pub fn with_sasl_scram_sha_256(
678 self,
679 username: impl Into<String>,
680 password: impl Into<String>,
681 ) -> Self {
682 self.with_sasl(SaslConfig::scram_sha_256(username, password))
683 }
684
685 pub fn with_sasl_scram_sha_512(
687 self,
688 username: impl Into<String>,
689 password: impl Into<String>,
690 ) -> Self {
691 self.with_sasl(SaslConfig::scram_sha_512(username, password))
692 }
693
694 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
696 self.request_timeout = request_timeout;
697 self
698 }
699
700 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
702 self.tcp_connector = tcp_connector;
703 self
704 }
705}
706
707#[derive(Debug, Clone, Copy, PartialEq, Eq)]
708pub enum AutoOffsetReset {
710 Earliest,
712 Latest,
714}
715
716#[derive(Debug, Clone, Copy, PartialEq, Eq)]
717pub enum IsolationLevel {
719 ReadUncommitted,
721 ReadCommitted,
723}
724
725impl IsolationLevel {
726 pub fn as_protocol_value(self) -> i8 {
728 match self {
729 Self::ReadUncommitted => READ_UNCOMMITTED,
730 Self::ReadCommitted => READ_COMMITTED,
731 }
732 }
733}
734
735#[derive(Debug, Clone)]
736pub struct ConsumerConfig {
738 pub bootstrap_servers: Vec<String>,
740 pub client_id: String,
742 pub group_id: String,
744 pub security_protocol: SecurityProtocol,
746 pub tls: TlsConfig,
748 pub sasl: SaslConfig,
750 pub request_timeout: Duration,
752 pub metadata_max_age: Duration,
754 pub retry_backoff: Duration,
756 pub max_retries: usize,
758 pub rebalance_timeout: Duration,
760 pub fetch_max_wait: Duration,
762 pub fetch_min_bytes: i32,
764 pub fetch_max_bytes: i32,
766 pub partition_max_bytes: i32,
768 pub auto_offset_reset: AutoOffsetReset,
770 pub isolation_level: IsolationLevel,
772 pub enable_auto_commit: bool,
774 pub auto_commit_interval: Duration,
776 pub server_assignor: Option<String>,
778 pub rack_id: Option<String>,
780 pub instance_id: Option<String>,
782 pub rebalance_listener: Option<ConsumerRebalanceListener>,
784 pub tcp_connector: Arc<dyn TcpConnector>,
786}
787
788impl ConsumerConfig {
789 pub fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
791 Self {
792 bootstrap_servers: vec![bootstrap_server.into()],
793 client_id: "rust-consumer".to_owned(),
794 group_id: group_id.into(),
795 security_protocol: SecurityProtocol::Plaintext,
796 tls: TlsConfig::default(),
797 sasl: SaslConfig::default(),
798 request_timeout: Duration::from_secs(5),
799 metadata_max_age: Duration::from_secs(30),
800 retry_backoff: Duration::from_millis(250),
801 max_retries: 3,
802 rebalance_timeout: Duration::from_secs(30),
803 fetch_max_wait: Duration::from_millis(500),
804 fetch_min_bytes: 1,
805 fetch_max_bytes: 50 * 1024 * 1024,
806 partition_max_bytes: 1024 * 1024,
807 auto_offset_reset: AutoOffsetReset::Earliest,
808 isolation_level: IsolationLevel::ReadUncommitted,
809 enable_auto_commit: false,
810 auto_commit_interval: Duration::from_secs(5),
811 server_assignor: None,
812 rack_id: None,
813 instance_id: None,
814 rebalance_listener: None,
815 tcp_connector: Arc::new(TokioTcpConnector),
816 }
817 }
818
819 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
821 self.client_id = client_id.into();
822 self
823 }
824
825 pub fn with_bootstrap_servers(
827 mut self,
828 servers: impl IntoIterator<Item = impl Into<String>>,
829 ) -> Self {
830 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
831 self
832 }
833
834 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
836 self.security_protocol = security_protocol;
837 self
838 }
839
840 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
842 self.security_protocol = if self.security_protocol.uses_sasl() {
843 SecurityProtocol::SaslSsl
844 } else {
845 SecurityProtocol::Ssl
846 };
847 self.tls = tls;
848 self
849 }
850
851 pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
853 self.security_protocol = if self.security_protocol.uses_sasl() {
854 SecurityProtocol::SaslSsl
855 } else {
856 SecurityProtocol::Ssl
857 };
858 self.tls = self.tls.with_ca_cert_path(path);
859 self
860 }
861
862 pub fn with_tls_client_auth_paths(
864 mut self,
865 cert_path: impl Into<PathBuf>,
866 key_path: impl Into<PathBuf>,
867 ) -> Self {
868 self.security_protocol = if self.security_protocol.uses_sasl() {
869 SecurityProtocol::SaslSsl
870 } else {
871 SecurityProtocol::Ssl
872 };
873 self.tls = self
874 .tls
875 .with_client_cert_path(cert_path)
876 .with_client_key_path(key_path);
877 self
878 }
879
880 pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
882 self.security_protocol = if self.security_protocol.uses_sasl() {
883 SecurityProtocol::SaslSsl
884 } else {
885 SecurityProtocol::Ssl
886 };
887 self.tls = self.tls.with_server_name(server_name);
888 self
889 }
890
891 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
893 self.security_protocol = self.security_protocol.with_sasl();
894 self.sasl = sasl;
895 self
896 }
897
898 pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
900 self.with_sasl(SaslConfig::plain(username, password))
901 }
902
903 pub fn with_sasl_scram_sha_256(
905 self,
906 username: impl Into<String>,
907 password: impl Into<String>,
908 ) -> Self {
909 self.with_sasl(SaslConfig::scram_sha_256(username, password))
910 }
911
912 pub fn with_sasl_scram_sha_512(
914 self,
915 username: impl Into<String>,
916 password: impl Into<String>,
917 ) -> Self {
918 self.with_sasl(SaslConfig::scram_sha_512(username, password))
919 }
920
921 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
923 self.request_timeout = request_timeout;
924 self
925 }
926
927 pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
929 self.metadata_max_age = metadata_max_age;
930 self
931 }
932
933 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
935 self.retry_backoff = retry_backoff;
936 self
937 }
938
939 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
941 self.max_retries = max_retries;
942 self
943 }
944
945 pub fn with_rebalance_timeout(mut self, rebalance_timeout: Duration) -> Self {
947 self.rebalance_timeout = rebalance_timeout;
948 self
949 }
950
951 pub fn with_fetch_max_wait(mut self, fetch_max_wait: Duration) -> Self {
953 self.fetch_max_wait = fetch_max_wait;
954 self
955 }
956
957 pub fn with_fetch_min_bytes(mut self, fetch_min_bytes: i32) -> Self {
959 self.fetch_min_bytes = fetch_min_bytes;
960 self
961 }
962
963 pub fn with_fetch_max_bytes(mut self, fetch_max_bytes: i32) -> Self {
965 self.fetch_max_bytes = fetch_max_bytes;
966 self
967 }
968
969 pub fn with_partition_max_bytes(mut self, partition_max_bytes: i32) -> Self {
971 self.partition_max_bytes = partition_max_bytes;
972 self
973 }
974
975 pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
977 self.auto_offset_reset = auto_offset_reset;
978 self
979 }
980
981 pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
983 self.isolation_level = isolation_level;
984 self
985 }
986
987 pub fn with_enable_auto_commit(mut self, enable_auto_commit: bool) -> Self {
989 self.enable_auto_commit = enable_auto_commit;
990 self
991 }
992
993 pub fn with_auto_commit_interval(mut self, auto_commit_interval: Duration) -> Self {
995 self.auto_commit_interval = auto_commit_interval;
996 self
997 }
998
999 pub fn with_server_assignor(mut self, server_assignor: impl Into<String>) -> Self {
1001 self.server_assignor = Some(server_assignor.into());
1002 self
1003 }
1004
1005 pub fn with_rack_id(mut self, rack_id: impl Into<String>) -> Self {
1007 self.rack_id = Some(rack_id.into());
1008 self
1009 }
1010
1011 pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
1013 self.instance_id = Some(instance_id.into());
1014 self
1015 }
1016
1017 pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
1019 self.rebalance_listener = Some(listener);
1020 self
1021 }
1022
1023 pub fn with_rebalance_callback(
1025 self,
1026 callback: impl Fn(crate::types::ConsumerRebalanceEvent) + Send + Sync + 'static,
1027 ) -> Self {
1028 self.with_rebalance_listener(ConsumerRebalanceListener::new(callback))
1029 }
1030
1031 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
1033 self.tcp_connector = tcp_connector;
1034 self
1035 }
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040 use super::*;
1041
1042 #[test]
1043 fn producer_tls_builder_enables_ssl() {
1044 let config =
1045 ProducerConfig::new("localhost:9093").with_tls_ca_cert_path("/tmp/cluster-ca.pem");
1046 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1047 assert_eq!(
1048 config.tls.ca_cert_path.as_deref(),
1049 Some(std::path::Path::new("/tmp/cluster-ca.pem"))
1050 );
1051 }
1052
1053 #[test]
1054 fn producer_sasl_plain_builder_enables_sasl_plaintext() {
1055 let config = ProducerConfig::new("localhost:9092").with_sasl_plain("user-a", "secret-a");
1056 assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
1057 assert_eq!(config.sasl.mechanism, SaslMechanism::Plain);
1058 assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
1059 assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
1060 }
1061
1062 #[test]
1063 fn sasl_config_debug_redacts_passwords() {
1064 let config = ProducerConfig::new("localhost:9092").with_sasl_plain("user-a", "secret-a");
1065
1066 let debug = format!("{config:?}");
1067
1068 assert!(debug.contains("user-a"));
1069 assert!(debug.contains("<redacted>"));
1070 assert!(!debug.contains("secret-a"));
1071 }
1072
1073 #[test]
1074 fn producer_sasl_scram_builder_enables_sasl_plaintext() {
1075 let config =
1076 ProducerConfig::new("localhost:9092").with_sasl_scram_sha_256("user-a", "secret-a");
1077 assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
1078 assert_eq!(config.sasl.mechanism, SaslMechanism::ScramSha256);
1079 assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
1080 assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
1081 }
1082
1083 #[test]
1084 fn producer_tls_and_sasl_builders_enable_sasl_ssl() {
1085 let config = ProducerConfig::new("localhost:9093")
1086 .with_sasl_plain("user-a", "secret-a")
1087 .with_tls_ca_cert_path("/tmp/cluster-ca.pem");
1088 assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
1089
1090 let config = ProducerConfig::new("localhost:9093")
1091 .with_tls_ca_cert_path("/tmp/cluster-ca.pem")
1092 .with_sasl_plain("user-a", "secret-a");
1093 assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
1094 }
1095
1096 #[test]
1097 fn producer_multi_broker_with_bootstrap_servers() {
1098 let config = ProducerConfig::new("host1:9092").with_bootstrap_servers([
1099 "host1:9092",
1100 "host2:9092",
1101 "host3:9092",
1102 ]);
1103 assert_eq!(
1104 config.bootstrap_servers,
1105 vec!["host1:9092", "host2:9092", "host3:9092"]
1106 );
1107 }
1108
1109 #[test]
1110 fn producer_builder_records_tuning_and_transaction_options() {
1111 let config = ProducerConfig::new("localhost:9092")
1112 .with_client_id("producer-a")
1113 .with_security_protocol(SecurityProtocol::Ssl)
1114 .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1115 .with_tls_server_name("kafka.internal")
1116 .with_acks(-1)
1117 .with_enable_idempotence(true)
1118 .with_partitioner(ProducerPartitioner::Default)
1119 .with_compression(ProducerCompression::Zstd)
1120 .with_batch_size(0)
1121 .with_buffer_memory(4096)
1122 .with_max_block(Duration::from_millis(250))
1123 .with_max_request_size(2048)
1124 .with_partitioner_ignore_keys(true)
1125 .with_linger(Duration::from_millis(50))
1126 .with_delivery_timeout(Duration::from_secs(10))
1127 .with_request_timeout(Duration::from_secs(2))
1128 .with_metadata_max_age(Duration::from_secs(60))
1129 .with_retry_backoff(Duration::from_millis(75))
1130 .with_max_retries(7)
1131 .with_transactional_id("tx-a")
1132 .with_transaction_timeout(Duration::from_secs(45));
1133
1134 assert_eq!(config.client_id, "producer-a");
1135 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1136 assert_eq!(
1137 config.tls.client_cert_path.as_deref(),
1138 Some(std::path::Path::new("/tmp/client.crt"))
1139 );
1140 assert_eq!(
1141 config.tls.client_key_path.as_deref(),
1142 Some(std::path::Path::new("/tmp/client.key"))
1143 );
1144 assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1145 assert_eq!(config.acks, -1);
1146 assert!(config.is_idempotent());
1147 assert_eq!(config.compression, ProducerCompression::Zstd);
1148 assert_eq!(config.batch_size, 1);
1149 assert_eq!(config.buffer_memory, 4096);
1150 assert_eq!(config.max_block, Duration::from_millis(250));
1151 assert_eq!(config.max_request_size, 2048);
1152 assert!(config.partitioner_ignore_keys);
1153 assert_eq!(config.linger, Duration::from_millis(50));
1154 assert_eq!(config.delivery_timeout, Duration::from_secs(10));
1155 assert_eq!(config.request_timeout, Duration::from_secs(2));
1156 assert_eq!(config.metadata_max_age, Duration::from_secs(60));
1157 assert_eq!(config.retry_backoff, Duration::from_millis(75));
1158 assert_eq!(config.max_retries, 7);
1159 assert_eq!(config.transactional_id.as_deref(), Some("tx-a"));
1160 assert_eq!(config.transaction_timeout, Duration::from_secs(45));
1161 assert!(config.is_transactional());
1162 }
1163
1164 #[test]
1165 fn consumer_tls_builder_keeps_override_server_name() {
1166 let config = ConsumerConfig::new("localhost:9093", "group-a")
1167 .with_tls(TlsConfig::new().with_server_name("kafka.internal"));
1168 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1169 assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1170 }
1171
1172 #[test]
1173 fn consumer_multi_broker_with_bootstrap_servers() {
1174 let config = ConsumerConfig::new("host1:9092", "group-a")
1175 .with_bootstrap_servers(["host1:9092", "host2:9092"]);
1176 assert_eq!(config.bootstrap_servers, vec!["host1:9092", "host2:9092"]);
1177 }
1178
1179 #[test]
1180 fn admin_tls_builder_records_client_auth_paths() {
1181 let config = AdminConfig::new("localhost:9093")
1182 .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key");
1183 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1184 assert_eq!(
1185 config.tls.client_cert_path.as_deref(),
1186 Some(std::path::Path::new("/tmp/client.crt"))
1187 );
1188 assert_eq!(
1189 config.tls.client_key_path.as_deref(),
1190 Some(std::path::Path::new("/tmp/client.key"))
1191 );
1192 }
1193
1194 #[test]
1195 fn admin_and_consumer_builders_record_remaining_options() {
1196 let admin = AdminConfig::new("localhost:9092")
1197 .with_client_id("admin-a")
1198 .with_bootstrap_servers(["host-a:9092", "host-b:9092"])
1199 .with_security_protocol(SecurityProtocol::Plaintext)
1200 .with_tls(TlsConfig::new().with_ca_cert_path("/tmp/ca.pem"))
1201 .with_sasl_scram_sha_512("user-a", "secret-a")
1202 .with_request_timeout(Duration::from_secs(8));
1203
1204 assert_eq!(admin.bootstrap_servers, vec!["host-a:9092", "host-b:9092"]);
1205 assert_eq!(admin.client_id, "admin-a");
1206 assert_eq!(admin.security_protocol, SecurityProtocol::SaslSsl);
1207 assert_eq!(
1208 admin.tls.ca_cert_path.as_deref(),
1209 Some(std::path::Path::new("/tmp/ca.pem"))
1210 );
1211 assert_eq!(admin.sasl.mechanism, SaslMechanism::ScramSha512);
1212 assert_eq!(admin.request_timeout, Duration::from_secs(8));
1213
1214 let consumer = ConsumerConfig::new("localhost:9092", "group-a")
1215 .with_client_id("consumer-a")
1216 .with_security_protocol(SecurityProtocol::Ssl)
1217 .with_tls_ca_cert_path("/tmp/ca.pem")
1218 .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1219 .with_sasl_scram_sha_512("user-b", "secret-b")
1220 .with_request_timeout(Duration::from_secs(3))
1221 .with_metadata_max_age(Duration::from_secs(4))
1222 .with_retry_backoff(Duration::from_millis(5))
1223 .with_max_retries(6)
1224 .with_rebalance_timeout(Duration::from_secs(7))
1225 .with_fetch_max_wait(Duration::from_millis(8))
1226 .with_fetch_min_bytes(9)
1227 .with_fetch_max_bytes(10)
1228 .with_partition_max_bytes(11)
1229 .with_auto_offset_reset(AutoOffsetReset::Latest)
1230 .with_isolation_level(IsolationLevel::ReadCommitted)
1231 .with_enable_auto_commit(true)
1232 .with_auto_commit_interval(Duration::from_secs(12))
1233 .with_server_assignor("range")
1234 .with_rack_id("rack-a")
1235 .with_instance_id("instance-a");
1236
1237 assert_eq!(consumer.client_id, "consumer-a");
1238 assert_eq!(consumer.security_protocol, SecurityProtocol::SaslSsl);
1239 assert_eq!(consumer.sasl.mechanism, SaslMechanism::ScramSha512);
1240 assert_eq!(consumer.request_timeout, Duration::from_secs(3));
1241 assert_eq!(consumer.metadata_max_age, Duration::from_secs(4));
1242 assert_eq!(consumer.retry_backoff, Duration::from_millis(5));
1243 assert_eq!(consumer.max_retries, 6);
1244 assert_eq!(consumer.rebalance_timeout, Duration::from_secs(7));
1245 assert_eq!(consumer.fetch_max_wait, Duration::from_millis(8));
1246 assert_eq!(consumer.fetch_min_bytes, 9);
1247 assert_eq!(consumer.fetch_max_bytes, 10);
1248 assert_eq!(consumer.partition_max_bytes, 11);
1249 assert_eq!(consumer.auto_offset_reset, AutoOffsetReset::Latest);
1250 assert_eq!(consumer.isolation_level, IsolationLevel::ReadCommitted);
1251 assert!(consumer.enable_auto_commit);
1252 assert_eq!(consumer.auto_commit_interval, Duration::from_secs(12));
1253 assert_eq!(consumer.server_assignor.as_deref(), Some("range"));
1254 assert_eq!(consumer.rack_id.as_deref(), Some("rack-a"));
1255 assert_eq!(consumer.instance_id.as_deref(), Some("instance-a"));
1256 }
1257}