1use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::Duration;
20
21use kafka_protocol::records::Compression;
22
23use crate::constants::{READ_COMMITTED, READ_UNCOMMITTED};
24use crate::network::{TcpConnector, TokioTcpConnector};
25use crate::types::ConsumerRebalanceListener;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum SecurityProtocol {
30 Plaintext,
32 Ssl,
34 SaslPlaintext,
36 SaslSsl,
38}
39
40impl SecurityProtocol {
41 pub fn uses_tls(self) -> bool {
43 matches!(self, Self::Ssl | Self::SaslSsl)
44 }
45
46 pub fn uses_sasl(self) -> bool {
48 matches!(self, Self::SaslPlaintext | Self::SaslSsl)
49 }
50
51 fn with_sasl(self) -> Self {
52 if self.uses_tls() {
53 Self::SaslSsl
54 } else {
55 Self::SaslPlaintext
56 }
57 }
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
61pub enum SaslMechanism {
63 #[default]
64 Plain,
66 ScramSha256,
68 ScramSha512,
70}
71
72impl SaslMechanism {
73 pub fn as_str(self) -> &'static str {
75 match self {
76 Self::Plain => "PLAIN",
77 Self::ScramSha256 => "SCRAM-SHA-256",
78 Self::ScramSha512 => "SCRAM-SHA-512",
79 }
80 }
81
82 pub fn is_scram(self) -> bool {
84 matches!(self, Self::ScramSha256 | Self::ScramSha512)
85 }
86
87 pub fn scram_type(self) -> Option<i8> {
89 match self {
90 Self::Plain => None,
91 Self::ScramSha256 => Some(1),
92 Self::ScramSha512 => Some(2),
93 }
94 }
95}
96
97#[derive(Debug, Clone, Default, PartialEq, Eq)]
98pub struct SaslConfig {
100 pub mechanism: SaslMechanism,
102 pub username: Option<String>,
104 pub password: Option<String>,
106 pub authorization_id: Option<String>,
108}
109
110impl SaslConfig {
111 pub fn plain(username: impl Into<String>, password: impl Into<String>) -> Self {
113 Self {
114 mechanism: SaslMechanism::Plain,
115 username: Some(username.into()),
116 password: Some(password.into()),
117 authorization_id: None,
118 }
119 }
120
121 pub fn scram_sha_256(username: impl Into<String>, password: impl Into<String>) -> Self {
123 Self {
124 mechanism: SaslMechanism::ScramSha256,
125 username: Some(username.into()),
126 password: Some(password.into()),
127 authorization_id: None,
128 }
129 }
130
131 pub fn scram_sha_512(username: impl Into<String>, password: impl Into<String>) -> Self {
133 Self {
134 mechanism: SaslMechanism::ScramSha512,
135 username: Some(username.into()),
136 password: Some(password.into()),
137 authorization_id: None,
138 }
139 }
140
141 pub fn with_authorization_id(mut self, authorization_id: impl Into<String>) -> Self {
143 self.authorization_id = Some(authorization_id.into());
144 self
145 }
146}
147
148#[derive(Debug, Clone, Default, PartialEq, Eq)]
149pub struct TlsConfig {
151 pub ca_cert_path: Option<PathBuf>,
153 pub client_cert_path: Option<PathBuf>,
155 pub client_key_path: Option<PathBuf>,
157 pub server_name: Option<String>,
159}
160
161impl TlsConfig {
162 pub fn new() -> Self {
164 Self::default()
165 }
166
167 pub fn with_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
169 self.ca_cert_path = Some(path.into());
170 self
171 }
172
173 pub fn with_client_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
175 self.client_cert_path = Some(path.into());
176 self
177 }
178
179 pub fn with_client_key_path(mut self, path: impl Into<PathBuf>) -> Self {
181 self.client_key_path = Some(path.into());
182 self
183 }
184
185 pub fn with_server_name(mut self, server_name: impl Into<String>) -> Self {
187 self.server_name = Some(server_name.into());
188 self
189 }
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
193pub enum ProducerPartitioner {
195 Default,
197}
198
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub enum ProducerCompression {
202 None,
204 Gzip,
206 Snappy,
208 Lz4,
210 Zstd,
212}
213
214impl From<ProducerCompression> for Compression {
215 fn from(value: ProducerCompression) -> Self {
216 match value {
217 ProducerCompression::None => Compression::None,
218 ProducerCompression::Gzip => Compression::Gzip,
219 ProducerCompression::Snappy => Compression::Snappy,
220 ProducerCompression::Lz4 => Compression::Lz4,
221 ProducerCompression::Zstd => Compression::Zstd,
222 }
223 }
224}
225
226#[derive(Debug, Clone)]
227pub struct ProducerConfig {
229 pub bootstrap_servers: Vec<String>,
231 pub client_id: String,
233 pub security_protocol: SecurityProtocol,
235 pub tls: TlsConfig,
237 pub sasl: SaslConfig,
239 pub acks: i16,
241 pub enable_idempotence: bool,
243 pub partitioner: ProducerPartitioner,
245 pub compression: ProducerCompression,
247 pub batch_size: usize,
249 pub linger: Duration,
251 pub delivery_timeout: Duration,
253 pub request_timeout: Duration,
255 pub metadata_max_age: Duration,
257 pub retry_backoff: Duration,
259 pub max_retries: usize,
261 pub max_in_flight_requests_per_connection: usize,
263 pub transactional_id: Option<String>,
265 pub transaction_timeout: Duration,
267 pub tcp_connector: Arc<dyn TcpConnector>,
269}
270
271impl ProducerConfig {
272 pub fn new(bootstrap_server: impl Into<String>) -> Self {
274 Self {
275 bootstrap_servers: vec![bootstrap_server.into()],
276 client_id: "rust-producer".to_owned(),
277 security_protocol: SecurityProtocol::Plaintext,
278 tls: TlsConfig::default(),
279 sasl: SaslConfig::default(),
280 acks: 1,
281 enable_idempotence: false,
282 partitioner: ProducerPartitioner::Default,
283 compression: ProducerCompression::None,
284 batch_size: 16 * 1024,
285 linger: Duration::ZERO,
286 delivery_timeout: Duration::from_secs(120),
287 request_timeout: Duration::from_secs(5),
288 metadata_max_age: Duration::from_secs(30),
289 retry_backoff: Duration::from_millis(250),
290 max_retries: 3,
291 max_in_flight_requests_per_connection: 5,
292 transactional_id: None,
293 transaction_timeout: Duration::from_secs(30),
294 tcp_connector: Arc::new(TokioTcpConnector),
295 }
296 }
297
298 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
300 self.client_id = client_id.into();
301 self
302 }
303
304 pub fn with_bootstrap_servers(
306 mut self,
307 servers: impl IntoIterator<Item = impl Into<String>>,
308 ) -> Self {
309 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
310 self
311 }
312
313 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
315 self.security_protocol = security_protocol;
316 self
317 }
318
319 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
321 self.security_protocol = if self.security_protocol.uses_sasl() {
322 SecurityProtocol::SaslSsl
323 } else {
324 SecurityProtocol::Ssl
325 };
326 self.tls = tls;
327 self
328 }
329
330 pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
332 self.security_protocol = if self.security_protocol.uses_sasl() {
333 SecurityProtocol::SaslSsl
334 } else {
335 SecurityProtocol::Ssl
336 };
337 self.tls = self.tls.with_ca_cert_path(path);
338 self
339 }
340
341 pub fn with_tls_client_auth_paths(
343 mut self,
344 cert_path: impl Into<PathBuf>,
345 key_path: impl Into<PathBuf>,
346 ) -> Self {
347 self.security_protocol = if self.security_protocol.uses_sasl() {
348 SecurityProtocol::SaslSsl
349 } else {
350 SecurityProtocol::Ssl
351 };
352 self.tls = self
353 .tls
354 .with_client_cert_path(cert_path)
355 .with_client_key_path(key_path);
356 self
357 }
358
359 pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
361 self.security_protocol = if self.security_protocol.uses_sasl() {
362 SecurityProtocol::SaslSsl
363 } else {
364 SecurityProtocol::Ssl
365 };
366 self.tls = self.tls.with_server_name(server_name);
367 self
368 }
369
370 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
372 self.security_protocol = self.security_protocol.with_sasl();
373 self.sasl = sasl;
374 self
375 }
376
377 pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
379 self.with_sasl(SaslConfig::plain(username, password))
380 }
381
382 pub fn with_sasl_scram_sha_256(
384 self,
385 username: impl Into<String>,
386 password: impl Into<String>,
387 ) -> Self {
388 self.with_sasl(SaslConfig::scram_sha_256(username, password))
389 }
390
391 pub fn with_sasl_scram_sha_512(
393 self,
394 username: impl Into<String>,
395 password: impl Into<String>,
396 ) -> Self {
397 self.with_sasl(SaslConfig::scram_sha_512(username, password))
398 }
399
400 pub fn with_acks(mut self, acks: i16) -> Self {
402 self.acks = acks;
403 self
404 }
405
406 pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
408 self.enable_idempotence = enable_idempotence;
409 if enable_idempotence {
410 self.acks = -1;
411 self.max_retries = self.max_retries.max(1);
412 }
413 self
414 }
415
416 pub fn with_partitioner(mut self, partitioner: ProducerPartitioner) -> Self {
418 self.partitioner = partitioner;
419 self
420 }
421
422 pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
424 self.compression = compression;
425 self
426 }
427
428 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
430 self.batch_size = batch_size.max(1);
431 self
432 }
433
434 pub fn with_linger(mut self, linger: Duration) -> Self {
436 self.linger = linger;
437 self
438 }
439
440 pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
442 self.delivery_timeout = delivery_timeout;
443 self
444 }
445
446 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
448 self.request_timeout = request_timeout;
449 self
450 }
451
452 pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
454 self.metadata_max_age = metadata_max_age;
455 self
456 }
457
458 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
460 self.retry_backoff = retry_backoff;
461 self
462 }
463
464 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
466 self.max_retries = max_retries;
467 self
468 }
469
470 pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
472 self.max_in_flight_requests_per_connection = max_in_flight.max(1);
473 self
474 }
475
476 pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
478 self.transactional_id = Some(transactional_id.into());
479 self.acks = -1;
480 self.enable_idempotence = true;
481 self
482 }
483
484 pub fn with_transaction_timeout(mut self, transaction_timeout: Duration) -> Self {
486 self.transaction_timeout = transaction_timeout;
487 self
488 }
489
490 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
492 self.tcp_connector = tcp_connector;
493 self
494 }
495
496 pub fn is_transactional(&self) -> bool {
498 self.transactional_id.is_some()
499 }
500
501 pub fn is_idempotent(&self) -> bool {
503 self.enable_idempotence
504 }
505}
506
507pub type ClientConfig = ProducerConfig;
509
510#[derive(Debug, Clone)]
511pub struct AdminConfig {
513 pub bootstrap_servers: Vec<String>,
515 pub client_id: String,
517 pub security_protocol: SecurityProtocol,
519 pub tls: TlsConfig,
521 pub sasl: SaslConfig,
523 pub request_timeout: Duration,
525 pub tcp_connector: Arc<dyn TcpConnector>,
527}
528
529impl AdminConfig {
530 pub fn new(bootstrap_server: impl Into<String>) -> Self {
532 Self {
533 bootstrap_servers: vec![bootstrap_server.into()],
534 client_id: "rust-admin".to_owned(),
535 security_protocol: SecurityProtocol::Plaintext,
536 tls: TlsConfig::default(),
537 sasl: SaslConfig::default(),
538 request_timeout: Duration::from_secs(5),
539 tcp_connector: Arc::new(TokioTcpConnector),
540 }
541 }
542
543 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
545 self.client_id = client_id.into();
546 self
547 }
548
549 pub fn with_bootstrap_servers(
551 mut self,
552 servers: impl IntoIterator<Item = impl Into<String>>,
553 ) -> Self {
554 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
555 self
556 }
557
558 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
560 self.security_protocol = security_protocol;
561 self
562 }
563
564 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
566 self.security_protocol = if self.security_protocol.uses_sasl() {
567 SecurityProtocol::SaslSsl
568 } else {
569 SecurityProtocol::Ssl
570 };
571 self.tls = tls;
572 self
573 }
574
575 pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
577 self.security_protocol = if self.security_protocol.uses_sasl() {
578 SecurityProtocol::SaslSsl
579 } else {
580 SecurityProtocol::Ssl
581 };
582 self.tls = self.tls.with_ca_cert_path(path);
583 self
584 }
585
586 pub fn with_tls_client_auth_paths(
588 mut self,
589 cert_path: impl Into<PathBuf>,
590 key_path: impl Into<PathBuf>,
591 ) -> Self {
592 self.security_protocol = if self.security_protocol.uses_sasl() {
593 SecurityProtocol::SaslSsl
594 } else {
595 SecurityProtocol::Ssl
596 };
597 self.tls = self
598 .tls
599 .with_client_cert_path(cert_path)
600 .with_client_key_path(key_path);
601 self
602 }
603
604 pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
606 self.security_protocol = if self.security_protocol.uses_sasl() {
607 SecurityProtocol::SaslSsl
608 } else {
609 SecurityProtocol::Ssl
610 };
611 self.tls = self.tls.with_server_name(server_name);
612 self
613 }
614
615 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
617 self.security_protocol = self.security_protocol.with_sasl();
618 self.sasl = sasl;
619 self
620 }
621
622 pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
624 self.with_sasl(SaslConfig::plain(username, password))
625 }
626
627 pub fn with_sasl_scram_sha_256(
629 self,
630 username: impl Into<String>,
631 password: impl Into<String>,
632 ) -> Self {
633 self.with_sasl(SaslConfig::scram_sha_256(username, password))
634 }
635
636 pub fn with_sasl_scram_sha_512(
638 self,
639 username: impl Into<String>,
640 password: impl Into<String>,
641 ) -> Self {
642 self.with_sasl(SaslConfig::scram_sha_512(username, password))
643 }
644
645 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
647 self.request_timeout = request_timeout;
648 self
649 }
650
651 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
653 self.tcp_connector = tcp_connector;
654 self
655 }
656}
657
658#[derive(Debug, Clone, Copy, PartialEq, Eq)]
659pub enum AutoOffsetReset {
661 Earliest,
663 Latest,
665}
666
667#[derive(Debug, Clone, Copy, PartialEq, Eq)]
668pub enum IsolationLevel {
670 ReadUncommitted,
672 ReadCommitted,
674}
675
676impl IsolationLevel {
677 pub fn as_protocol_value(self) -> i8 {
679 match self {
680 Self::ReadUncommitted => READ_UNCOMMITTED,
681 Self::ReadCommitted => READ_COMMITTED,
682 }
683 }
684}
685
686#[derive(Debug, Clone)]
687pub struct ConsumerConfig {
689 pub bootstrap_servers: Vec<String>,
691 pub client_id: String,
693 pub group_id: String,
695 pub security_protocol: SecurityProtocol,
697 pub tls: TlsConfig,
699 pub sasl: SaslConfig,
701 pub request_timeout: Duration,
703 pub metadata_max_age: Duration,
705 pub retry_backoff: Duration,
707 pub max_retries: usize,
709 pub rebalance_timeout: Duration,
711 pub fetch_max_wait: Duration,
713 pub fetch_min_bytes: i32,
715 pub fetch_max_bytes: i32,
717 pub partition_max_bytes: i32,
719 pub auto_offset_reset: AutoOffsetReset,
721 pub isolation_level: IsolationLevel,
723 pub enable_auto_commit: bool,
725 pub auto_commit_interval: Duration,
727 pub server_assignor: Option<String>,
729 pub rack_id: Option<String>,
731 pub instance_id: Option<String>,
733 pub rebalance_listener: Option<ConsumerRebalanceListener>,
735 pub tcp_connector: Arc<dyn TcpConnector>,
737}
738
739impl ConsumerConfig {
740 pub fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
742 Self {
743 bootstrap_servers: vec![bootstrap_server.into()],
744 client_id: "rust-consumer".to_owned(),
745 group_id: group_id.into(),
746 security_protocol: SecurityProtocol::Plaintext,
747 tls: TlsConfig::default(),
748 sasl: SaslConfig::default(),
749 request_timeout: Duration::from_secs(5),
750 metadata_max_age: Duration::from_secs(30),
751 retry_backoff: Duration::from_millis(250),
752 max_retries: 3,
753 rebalance_timeout: Duration::from_secs(30),
754 fetch_max_wait: Duration::from_millis(500),
755 fetch_min_bytes: 1,
756 fetch_max_bytes: 50 * 1024 * 1024,
757 partition_max_bytes: 1024 * 1024,
758 auto_offset_reset: AutoOffsetReset::Earliest,
759 isolation_level: IsolationLevel::ReadUncommitted,
760 enable_auto_commit: false,
761 auto_commit_interval: Duration::from_secs(5),
762 server_assignor: None,
763 rack_id: None,
764 instance_id: None,
765 rebalance_listener: None,
766 tcp_connector: Arc::new(TokioTcpConnector),
767 }
768 }
769
770 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
772 self.client_id = client_id.into();
773 self
774 }
775
776 pub fn with_bootstrap_servers(
778 mut self,
779 servers: impl IntoIterator<Item = impl Into<String>>,
780 ) -> Self {
781 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
782 self
783 }
784
785 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
787 self.security_protocol = security_protocol;
788 self
789 }
790
791 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
793 self.security_protocol = if self.security_protocol.uses_sasl() {
794 SecurityProtocol::SaslSsl
795 } else {
796 SecurityProtocol::Ssl
797 };
798 self.tls = tls;
799 self
800 }
801
802 pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
804 self.security_protocol = if self.security_protocol.uses_sasl() {
805 SecurityProtocol::SaslSsl
806 } else {
807 SecurityProtocol::Ssl
808 };
809 self.tls = self.tls.with_ca_cert_path(path);
810 self
811 }
812
813 pub fn with_tls_client_auth_paths(
815 mut self,
816 cert_path: impl Into<PathBuf>,
817 key_path: impl Into<PathBuf>,
818 ) -> Self {
819 self.security_protocol = if self.security_protocol.uses_sasl() {
820 SecurityProtocol::SaslSsl
821 } else {
822 SecurityProtocol::Ssl
823 };
824 self.tls = self
825 .tls
826 .with_client_cert_path(cert_path)
827 .with_client_key_path(key_path);
828 self
829 }
830
831 pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
833 self.security_protocol = if self.security_protocol.uses_sasl() {
834 SecurityProtocol::SaslSsl
835 } else {
836 SecurityProtocol::Ssl
837 };
838 self.tls = self.tls.with_server_name(server_name);
839 self
840 }
841
842 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
844 self.security_protocol = self.security_protocol.with_sasl();
845 self.sasl = sasl;
846 self
847 }
848
849 pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
851 self.with_sasl(SaslConfig::plain(username, password))
852 }
853
854 pub fn with_sasl_scram_sha_256(
856 self,
857 username: impl Into<String>,
858 password: impl Into<String>,
859 ) -> Self {
860 self.with_sasl(SaslConfig::scram_sha_256(username, password))
861 }
862
863 pub fn with_sasl_scram_sha_512(
865 self,
866 username: impl Into<String>,
867 password: impl Into<String>,
868 ) -> Self {
869 self.with_sasl(SaslConfig::scram_sha_512(username, password))
870 }
871
872 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
874 self.request_timeout = request_timeout;
875 self
876 }
877
878 pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
880 self.metadata_max_age = metadata_max_age;
881 self
882 }
883
884 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
886 self.retry_backoff = retry_backoff;
887 self
888 }
889
890 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
892 self.max_retries = max_retries;
893 self
894 }
895
896 pub fn with_rebalance_timeout(mut self, rebalance_timeout: Duration) -> Self {
898 self.rebalance_timeout = rebalance_timeout;
899 self
900 }
901
902 pub fn with_fetch_max_wait(mut self, fetch_max_wait: Duration) -> Self {
904 self.fetch_max_wait = fetch_max_wait;
905 self
906 }
907
908 pub fn with_fetch_min_bytes(mut self, fetch_min_bytes: i32) -> Self {
910 self.fetch_min_bytes = fetch_min_bytes;
911 self
912 }
913
914 pub fn with_fetch_max_bytes(mut self, fetch_max_bytes: i32) -> Self {
916 self.fetch_max_bytes = fetch_max_bytes;
917 self
918 }
919
920 pub fn with_partition_max_bytes(mut self, partition_max_bytes: i32) -> Self {
922 self.partition_max_bytes = partition_max_bytes;
923 self
924 }
925
926 pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
928 self.auto_offset_reset = auto_offset_reset;
929 self
930 }
931
932 pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
934 self.isolation_level = isolation_level;
935 self
936 }
937
938 pub fn with_enable_auto_commit(mut self, enable_auto_commit: bool) -> Self {
940 self.enable_auto_commit = enable_auto_commit;
941 self
942 }
943
944 pub fn with_auto_commit_interval(mut self, auto_commit_interval: Duration) -> Self {
946 self.auto_commit_interval = auto_commit_interval;
947 self
948 }
949
950 pub fn with_server_assignor(mut self, server_assignor: impl Into<String>) -> Self {
952 self.server_assignor = Some(server_assignor.into());
953 self
954 }
955
956 pub fn with_rack_id(mut self, rack_id: impl Into<String>) -> Self {
958 self.rack_id = Some(rack_id.into());
959 self
960 }
961
962 pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
964 self.instance_id = Some(instance_id.into());
965 self
966 }
967
968 pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
970 self.rebalance_listener = Some(listener);
971 self
972 }
973
974 pub fn with_rebalance_callback(
976 self,
977 callback: impl Fn(crate::types::ConsumerRebalanceEvent) + Send + Sync + 'static,
978 ) -> Self {
979 self.with_rebalance_listener(ConsumerRebalanceListener::new(callback))
980 }
981
982 pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
984 self.tcp_connector = tcp_connector;
985 self
986 }
987}
988
989#[cfg(test)]
990mod tests {
991 use super::*;
992
993 #[test]
994 fn producer_tls_builder_enables_ssl() {
995 let config =
996 ProducerConfig::new("localhost:9093").with_tls_ca_cert_path("/tmp/cluster-ca.pem");
997 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
998 assert_eq!(
999 config.tls.ca_cert_path.as_deref(),
1000 Some(std::path::Path::new("/tmp/cluster-ca.pem"))
1001 );
1002 }
1003
1004 #[test]
1005 fn producer_sasl_plain_builder_enables_sasl_plaintext() {
1006 let config = ProducerConfig::new("localhost:9092").with_sasl_plain("user-a", "secret-a");
1007 assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
1008 assert_eq!(config.sasl.mechanism, SaslMechanism::Plain);
1009 assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
1010 assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
1011 }
1012
1013 #[test]
1014 fn producer_sasl_scram_builder_enables_sasl_plaintext() {
1015 let config =
1016 ProducerConfig::new("localhost:9092").with_sasl_scram_sha_256("user-a", "secret-a");
1017 assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
1018 assert_eq!(config.sasl.mechanism, SaslMechanism::ScramSha256);
1019 assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
1020 assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
1021 }
1022
1023 #[test]
1024 fn producer_tls_and_sasl_builders_enable_sasl_ssl() {
1025 let config = ProducerConfig::new("localhost:9093")
1026 .with_sasl_plain("user-a", "secret-a")
1027 .with_tls_ca_cert_path("/tmp/cluster-ca.pem");
1028 assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
1029
1030 let config = ProducerConfig::new("localhost:9093")
1031 .with_tls_ca_cert_path("/tmp/cluster-ca.pem")
1032 .with_sasl_plain("user-a", "secret-a");
1033 assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
1034 }
1035
1036 #[test]
1037 fn producer_multi_broker_with_bootstrap_servers() {
1038 let config = ProducerConfig::new("host1:9092").with_bootstrap_servers([
1039 "host1:9092",
1040 "host2:9092",
1041 "host3:9092",
1042 ]);
1043 assert_eq!(
1044 config.bootstrap_servers,
1045 vec!["host1:9092", "host2:9092", "host3:9092"]
1046 );
1047 }
1048
1049 #[test]
1050 fn producer_builder_records_tuning_and_transaction_options() {
1051 let config = ProducerConfig::new("localhost:9092")
1052 .with_client_id("producer-a")
1053 .with_security_protocol(SecurityProtocol::Ssl)
1054 .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1055 .with_tls_server_name("kafka.internal")
1056 .with_acks(-1)
1057 .with_enable_idempotence(true)
1058 .with_partitioner(ProducerPartitioner::Default)
1059 .with_compression(ProducerCompression::Zstd)
1060 .with_batch_size(0)
1061 .with_linger(Duration::from_millis(50))
1062 .with_delivery_timeout(Duration::from_secs(10))
1063 .with_request_timeout(Duration::from_secs(2))
1064 .with_metadata_max_age(Duration::from_secs(60))
1065 .with_retry_backoff(Duration::from_millis(75))
1066 .with_max_retries(7)
1067 .with_transactional_id("tx-a")
1068 .with_transaction_timeout(Duration::from_secs(45));
1069
1070 assert_eq!(config.client_id, "producer-a");
1071 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1072 assert_eq!(
1073 config.tls.client_cert_path.as_deref(),
1074 Some(std::path::Path::new("/tmp/client.crt"))
1075 );
1076 assert_eq!(
1077 config.tls.client_key_path.as_deref(),
1078 Some(std::path::Path::new("/tmp/client.key"))
1079 );
1080 assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1081 assert_eq!(config.acks, -1);
1082 assert!(config.is_idempotent());
1083 assert_eq!(config.compression, ProducerCompression::Zstd);
1084 assert_eq!(config.batch_size, 1);
1085 assert_eq!(config.linger, Duration::from_millis(50));
1086 assert_eq!(config.delivery_timeout, Duration::from_secs(10));
1087 assert_eq!(config.request_timeout, Duration::from_secs(2));
1088 assert_eq!(config.metadata_max_age, Duration::from_secs(60));
1089 assert_eq!(config.retry_backoff, Duration::from_millis(75));
1090 assert_eq!(config.max_retries, 7);
1091 assert_eq!(config.transactional_id.as_deref(), Some("tx-a"));
1092 assert_eq!(config.transaction_timeout, Duration::from_secs(45));
1093 assert!(config.is_transactional());
1094 }
1095
1096 #[test]
1097 fn consumer_tls_builder_keeps_override_server_name() {
1098 let config = ConsumerConfig::new("localhost:9093", "group-a")
1099 .with_tls(TlsConfig::new().with_server_name("kafka.internal"));
1100 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1101 assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1102 }
1103
1104 #[test]
1105 fn consumer_multi_broker_with_bootstrap_servers() {
1106 let config = ConsumerConfig::new("host1:9092", "group-a")
1107 .with_bootstrap_servers(["host1:9092", "host2:9092"]);
1108 assert_eq!(config.bootstrap_servers, vec!["host1:9092", "host2:9092"]);
1109 }
1110
1111 #[test]
1112 fn admin_tls_builder_records_client_auth_paths() {
1113 let config = AdminConfig::new("localhost:9093")
1114 .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key");
1115 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1116 assert_eq!(
1117 config.tls.client_cert_path.as_deref(),
1118 Some(std::path::Path::new("/tmp/client.crt"))
1119 );
1120 assert_eq!(
1121 config.tls.client_key_path.as_deref(),
1122 Some(std::path::Path::new("/tmp/client.key"))
1123 );
1124 }
1125
1126 #[test]
1127 fn admin_and_consumer_builders_record_remaining_options() {
1128 let admin = AdminConfig::new("localhost:9092")
1129 .with_client_id("admin-a")
1130 .with_bootstrap_servers(["host-a:9092", "host-b:9092"])
1131 .with_security_protocol(SecurityProtocol::Plaintext)
1132 .with_tls(TlsConfig::new().with_ca_cert_path("/tmp/ca.pem"))
1133 .with_sasl_scram_sha_512("user-a", "secret-a")
1134 .with_request_timeout(Duration::from_secs(8));
1135
1136 assert_eq!(admin.bootstrap_servers, vec!["host-a:9092", "host-b:9092"]);
1137 assert_eq!(admin.client_id, "admin-a");
1138 assert_eq!(admin.security_protocol, SecurityProtocol::SaslSsl);
1139 assert_eq!(
1140 admin.tls.ca_cert_path.as_deref(),
1141 Some(std::path::Path::new("/tmp/ca.pem"))
1142 );
1143 assert_eq!(admin.sasl.mechanism, SaslMechanism::ScramSha512);
1144 assert_eq!(admin.request_timeout, Duration::from_secs(8));
1145
1146 let consumer = ConsumerConfig::new("localhost:9092", "group-a")
1147 .with_client_id("consumer-a")
1148 .with_security_protocol(SecurityProtocol::Ssl)
1149 .with_tls_ca_cert_path("/tmp/ca.pem")
1150 .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1151 .with_sasl_scram_sha_512("user-b", "secret-b")
1152 .with_request_timeout(Duration::from_secs(3))
1153 .with_metadata_max_age(Duration::from_secs(4))
1154 .with_retry_backoff(Duration::from_millis(5))
1155 .with_max_retries(6)
1156 .with_rebalance_timeout(Duration::from_secs(7))
1157 .with_fetch_max_wait(Duration::from_millis(8))
1158 .with_fetch_min_bytes(9)
1159 .with_fetch_max_bytes(10)
1160 .with_partition_max_bytes(11)
1161 .with_auto_offset_reset(AutoOffsetReset::Latest)
1162 .with_isolation_level(IsolationLevel::ReadCommitted)
1163 .with_enable_auto_commit(true)
1164 .with_auto_commit_interval(Duration::from_secs(12))
1165 .with_server_assignor("range")
1166 .with_rack_id("rack-a")
1167 .with_instance_id("instance-a");
1168
1169 assert_eq!(consumer.client_id, "consumer-a");
1170 assert_eq!(consumer.security_protocol, SecurityProtocol::SaslSsl);
1171 assert_eq!(consumer.sasl.mechanism, SaslMechanism::ScramSha512);
1172 assert_eq!(consumer.request_timeout, Duration::from_secs(3));
1173 assert_eq!(consumer.metadata_max_age, Duration::from_secs(4));
1174 assert_eq!(consumer.retry_backoff, Duration::from_millis(5));
1175 assert_eq!(consumer.max_retries, 6);
1176 assert_eq!(consumer.rebalance_timeout, Duration::from_secs(7));
1177 assert_eq!(consumer.fetch_max_wait, Duration::from_millis(8));
1178 assert_eq!(consumer.fetch_min_bytes, 9);
1179 assert_eq!(consumer.fetch_max_bytes, 10);
1180 assert_eq!(consumer.partition_max_bytes, 11);
1181 assert_eq!(consumer.auto_offset_reset, AutoOffsetReset::Latest);
1182 assert_eq!(consumer.isolation_level, IsolationLevel::ReadCommitted);
1183 assert!(consumer.enable_auto_commit);
1184 assert_eq!(consumer.auto_commit_interval, Duration::from_secs(12));
1185 assert_eq!(consumer.server_assignor.as_deref(), Some("range"));
1186 assert_eq!(consumer.rack_id.as_deref(), Some("rack-a"));
1187 assert_eq!(consumer.instance_id.as_deref(), Some("instance-a"));
1188 }
1189}