1use std::path::PathBuf;
18use std::time::Duration;
19
20use kafka_protocol::records::Compression;
21
22use crate::constants::{READ_COMMITTED, READ_UNCOMMITTED};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum SecurityProtocol {
27 Plaintext,
29 Ssl,
31 SaslPlaintext,
33 SaslSsl,
35}
36
37impl SecurityProtocol {
38 pub fn uses_tls(self) -> bool {
40 matches!(self, Self::Ssl | Self::SaslSsl)
41 }
42
43 pub fn uses_sasl(self) -> bool {
45 matches!(self, Self::SaslPlaintext | Self::SaslSsl)
46 }
47
48 fn with_sasl(self) -> Self {
49 if self.uses_tls() {
50 Self::SaslSsl
51 } else {
52 Self::SaslPlaintext
53 }
54 }
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
58pub enum SaslMechanism {
60 #[default]
61 Plain,
63 ScramSha256,
65 ScramSha512,
67}
68
69impl SaslMechanism {
70 pub fn as_str(self) -> &'static str {
72 match self {
73 Self::Plain => "PLAIN",
74 Self::ScramSha256 => "SCRAM-SHA-256",
75 Self::ScramSha512 => "SCRAM-SHA-512",
76 }
77 }
78
79 pub fn is_scram(self) -> bool {
81 matches!(self, Self::ScramSha256 | Self::ScramSha512)
82 }
83
84 pub fn scram_type(self) -> Option<i8> {
86 match self {
87 Self::Plain => None,
88 Self::ScramSha256 => Some(1),
89 Self::ScramSha512 => Some(2),
90 }
91 }
92}
93
94#[derive(Debug, Clone, Default, PartialEq, Eq)]
95pub struct SaslConfig {
97 pub mechanism: SaslMechanism,
99 pub username: Option<String>,
101 pub password: Option<String>,
103 pub authorization_id: Option<String>,
105}
106
107impl SaslConfig {
108 pub fn plain(username: impl Into<String>, password: impl Into<String>) -> Self {
110 Self {
111 mechanism: SaslMechanism::Plain,
112 username: Some(username.into()),
113 password: Some(password.into()),
114 authorization_id: None,
115 }
116 }
117
118 pub fn scram_sha_256(username: impl Into<String>, password: impl Into<String>) -> Self {
120 Self {
121 mechanism: SaslMechanism::ScramSha256,
122 username: Some(username.into()),
123 password: Some(password.into()),
124 authorization_id: None,
125 }
126 }
127
128 pub fn scram_sha_512(username: impl Into<String>, password: impl Into<String>) -> Self {
130 Self {
131 mechanism: SaslMechanism::ScramSha512,
132 username: Some(username.into()),
133 password: Some(password.into()),
134 authorization_id: None,
135 }
136 }
137
138 pub fn with_authorization_id(mut self, authorization_id: impl Into<String>) -> Self {
140 self.authorization_id = Some(authorization_id.into());
141 self
142 }
143}
144
145#[derive(Debug, Clone, Default, PartialEq, Eq)]
146pub struct TlsConfig {
148 pub ca_cert_path: Option<PathBuf>,
150 pub client_cert_path: Option<PathBuf>,
152 pub client_key_path: Option<PathBuf>,
154 pub server_name: Option<String>,
156}
157
158impl TlsConfig {
159 pub fn new() -> Self {
161 Self::default()
162 }
163
164 pub fn with_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
166 self.ca_cert_path = Some(path.into());
167 self
168 }
169
170 pub fn with_client_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
172 self.client_cert_path = Some(path.into());
173 self
174 }
175
176 pub fn with_client_key_path(mut self, path: impl Into<PathBuf>) -> Self {
178 self.client_key_path = Some(path.into());
179 self
180 }
181
182 pub fn with_server_name(mut self, server_name: impl Into<String>) -> Self {
184 self.server_name = Some(server_name.into());
185 self
186 }
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum ProducerPartitioner {
192 Default,
194}
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197pub enum ProducerCompression {
199 None,
201 Gzip,
203 Snappy,
205 Lz4,
207 Zstd,
209}
210
211impl From<ProducerCompression> for Compression {
212 fn from(value: ProducerCompression) -> Self {
213 match value {
214 ProducerCompression::None => Compression::None,
215 ProducerCompression::Gzip => Compression::Gzip,
216 ProducerCompression::Snappy => Compression::Snappy,
217 ProducerCompression::Lz4 => Compression::Lz4,
218 ProducerCompression::Zstd => Compression::Zstd,
219 }
220 }
221}
222
223#[derive(Debug, Clone)]
224pub struct ProducerConfig {
226 pub bootstrap_servers: Vec<String>,
228 pub client_id: String,
230 pub security_protocol: SecurityProtocol,
232 pub tls: TlsConfig,
234 pub sasl: SaslConfig,
236 pub acks: i16,
238 pub enable_idempotence: bool,
240 pub partitioner: ProducerPartitioner,
242 pub compression: ProducerCompression,
244 pub batch_size: usize,
246 pub linger: Duration,
248 pub delivery_timeout: Duration,
250 pub request_timeout: Duration,
252 pub metadata_max_age: Duration,
254 pub retry_backoff: Duration,
256 pub max_retries: usize,
258 pub max_in_flight_requests_per_connection: usize,
260 pub transactional_id: Option<String>,
262 pub transaction_timeout: Duration,
264}
265
266impl ProducerConfig {
267 pub fn new(bootstrap_server: impl Into<String>) -> Self {
269 Self {
270 bootstrap_servers: vec![bootstrap_server.into()],
271 client_id: "rust-producer".to_owned(),
272 security_protocol: SecurityProtocol::Plaintext,
273 tls: TlsConfig::default(),
274 sasl: SaslConfig::default(),
275 acks: 1,
276 enable_idempotence: false,
277 partitioner: ProducerPartitioner::Default,
278 compression: ProducerCompression::None,
279 batch_size: 16 * 1024,
280 linger: Duration::ZERO,
281 delivery_timeout: Duration::from_secs(120),
282 request_timeout: Duration::from_secs(5),
283 metadata_max_age: Duration::from_secs(30),
284 retry_backoff: Duration::from_millis(250),
285 max_retries: 3,
286 max_in_flight_requests_per_connection: 5,
287 transactional_id: None,
288 transaction_timeout: Duration::from_secs(30),
289 }
290 }
291
292 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
294 self.client_id = client_id.into();
295 self
296 }
297
298 pub fn with_bootstrap_servers(
300 mut self,
301 servers: impl IntoIterator<Item = impl Into<String>>,
302 ) -> Self {
303 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
304 self
305 }
306
307 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
309 self.security_protocol = security_protocol;
310 self
311 }
312
313 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
315 self.security_protocol = if self.security_protocol.uses_sasl() {
316 SecurityProtocol::SaslSsl
317 } else {
318 SecurityProtocol::Ssl
319 };
320 self.tls = tls;
321 self
322 }
323
324 pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
326 self.security_protocol = if self.security_protocol.uses_sasl() {
327 SecurityProtocol::SaslSsl
328 } else {
329 SecurityProtocol::Ssl
330 };
331 self.tls = self.tls.with_ca_cert_path(path);
332 self
333 }
334
335 pub fn with_tls_client_auth_paths(
337 mut self,
338 cert_path: impl Into<PathBuf>,
339 key_path: impl Into<PathBuf>,
340 ) -> Self {
341 self.security_protocol = if self.security_protocol.uses_sasl() {
342 SecurityProtocol::SaslSsl
343 } else {
344 SecurityProtocol::Ssl
345 };
346 self.tls = self
347 .tls
348 .with_client_cert_path(cert_path)
349 .with_client_key_path(key_path);
350 self
351 }
352
353 pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
355 self.security_protocol = if self.security_protocol.uses_sasl() {
356 SecurityProtocol::SaslSsl
357 } else {
358 SecurityProtocol::Ssl
359 };
360 self.tls = self.tls.with_server_name(server_name);
361 self
362 }
363
364 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
366 self.security_protocol = self.security_protocol.with_sasl();
367 self.sasl = sasl;
368 self
369 }
370
371 pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
373 self.with_sasl(SaslConfig::plain(username, password))
374 }
375
376 pub fn with_sasl_scram_sha_256(
378 self,
379 username: impl Into<String>,
380 password: impl Into<String>,
381 ) -> Self {
382 self.with_sasl(SaslConfig::scram_sha_256(username, password))
383 }
384
385 pub fn with_sasl_scram_sha_512(
387 self,
388 username: impl Into<String>,
389 password: impl Into<String>,
390 ) -> Self {
391 self.with_sasl(SaslConfig::scram_sha_512(username, password))
392 }
393
394 pub fn with_acks(mut self, acks: i16) -> Self {
396 self.acks = acks;
397 self
398 }
399
400 pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
402 self.enable_idempotence = enable_idempotence;
403 if enable_idempotence {
404 self.acks = -1;
405 self.max_retries = self.max_retries.max(1);
406 }
407 self
408 }
409
410 pub fn with_partitioner(mut self, partitioner: ProducerPartitioner) -> Self {
412 self.partitioner = partitioner;
413 self
414 }
415
416 pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
418 self.compression = compression;
419 self
420 }
421
422 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
424 self.batch_size = batch_size.max(1);
425 self
426 }
427
428 pub fn with_linger(mut self, linger: Duration) -> Self {
430 self.linger = linger;
431 self
432 }
433
434 pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
436 self.delivery_timeout = delivery_timeout;
437 self
438 }
439
440 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
442 self.request_timeout = request_timeout;
443 self
444 }
445
446 pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
448 self.metadata_max_age = metadata_max_age;
449 self
450 }
451
452 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
454 self.retry_backoff = retry_backoff;
455 self
456 }
457
458 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
460 self.max_retries = max_retries;
461 self
462 }
463
464 pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
466 self.max_in_flight_requests_per_connection = max_in_flight.max(1);
467 self
468 }
469
470 pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
472 self.transactional_id = Some(transactional_id.into());
473 self.acks = -1;
474 self.enable_idempotence = true;
475 self
476 }
477
478 pub fn with_transaction_timeout(mut self, transaction_timeout: Duration) -> Self {
480 self.transaction_timeout = transaction_timeout;
481 self
482 }
483
484 pub fn is_transactional(&self) -> bool {
486 self.transactional_id.is_some()
487 }
488
489 pub fn is_idempotent(&self) -> bool {
491 self.enable_idempotence
492 }
493}
494
495pub type ClientConfig = ProducerConfig;
497
498#[derive(Debug, Clone)]
499pub struct AdminConfig {
501 pub bootstrap_servers: Vec<String>,
503 pub client_id: String,
505 pub security_protocol: SecurityProtocol,
507 pub tls: TlsConfig,
509 pub sasl: SaslConfig,
511 pub request_timeout: Duration,
513}
514
515impl AdminConfig {
516 pub fn new(bootstrap_server: impl Into<String>) -> Self {
518 Self {
519 bootstrap_servers: vec![bootstrap_server.into()],
520 client_id: "rust-admin".to_owned(),
521 security_protocol: SecurityProtocol::Plaintext,
522 tls: TlsConfig::default(),
523 sasl: SaslConfig::default(),
524 request_timeout: Duration::from_secs(5),
525 }
526 }
527
528 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
530 self.client_id = client_id.into();
531 self
532 }
533
534 pub fn with_bootstrap_servers(
536 mut self,
537 servers: impl IntoIterator<Item = impl Into<String>>,
538 ) -> Self {
539 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
540 self
541 }
542
543 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
545 self.security_protocol = security_protocol;
546 self
547 }
548
549 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
551 self.security_protocol = if self.security_protocol.uses_sasl() {
552 SecurityProtocol::SaslSsl
553 } else {
554 SecurityProtocol::Ssl
555 };
556 self.tls = tls;
557 self
558 }
559
560 pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
562 self.security_protocol = if self.security_protocol.uses_sasl() {
563 SecurityProtocol::SaslSsl
564 } else {
565 SecurityProtocol::Ssl
566 };
567 self.tls = self.tls.with_ca_cert_path(path);
568 self
569 }
570
571 pub fn with_tls_client_auth_paths(
573 mut self,
574 cert_path: impl Into<PathBuf>,
575 key_path: impl Into<PathBuf>,
576 ) -> Self {
577 self.security_protocol = if self.security_protocol.uses_sasl() {
578 SecurityProtocol::SaslSsl
579 } else {
580 SecurityProtocol::Ssl
581 };
582 self.tls = self
583 .tls
584 .with_client_cert_path(cert_path)
585 .with_client_key_path(key_path);
586 self
587 }
588
589 pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
591 self.security_protocol = if self.security_protocol.uses_sasl() {
592 SecurityProtocol::SaslSsl
593 } else {
594 SecurityProtocol::Ssl
595 };
596 self.tls = self.tls.with_server_name(server_name);
597 self
598 }
599
600 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
602 self.security_protocol = self.security_protocol.with_sasl();
603 self.sasl = sasl;
604 self
605 }
606
607 pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
609 self.with_sasl(SaslConfig::plain(username, password))
610 }
611
612 pub fn with_sasl_scram_sha_256(
614 self,
615 username: impl Into<String>,
616 password: impl Into<String>,
617 ) -> Self {
618 self.with_sasl(SaslConfig::scram_sha_256(username, password))
619 }
620
621 pub fn with_sasl_scram_sha_512(
623 self,
624 username: impl Into<String>,
625 password: impl Into<String>,
626 ) -> Self {
627 self.with_sasl(SaslConfig::scram_sha_512(username, password))
628 }
629
630 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
632 self.request_timeout = request_timeout;
633 self
634 }
635}
636
637#[derive(Debug, Clone, Copy, PartialEq, Eq)]
638pub enum AutoOffsetReset {
640 Earliest,
642 Latest,
644}
645
646#[derive(Debug, Clone, Copy, PartialEq, Eq)]
647pub enum IsolationLevel {
649 ReadUncommitted,
651 ReadCommitted,
653}
654
655impl IsolationLevel {
656 pub fn as_protocol_value(self) -> i8 {
658 match self {
659 Self::ReadUncommitted => READ_UNCOMMITTED,
660 Self::ReadCommitted => READ_COMMITTED,
661 }
662 }
663}
664
665#[derive(Debug, Clone)]
666pub struct ConsumerConfig {
668 pub bootstrap_servers: Vec<String>,
670 pub client_id: String,
672 pub group_id: String,
674 pub security_protocol: SecurityProtocol,
676 pub tls: TlsConfig,
678 pub sasl: SaslConfig,
680 pub request_timeout: Duration,
682 pub metadata_max_age: Duration,
684 pub retry_backoff: Duration,
686 pub max_retries: usize,
688 pub rebalance_timeout: Duration,
690 pub fetch_max_wait: Duration,
692 pub fetch_min_bytes: i32,
694 pub fetch_max_bytes: i32,
696 pub partition_max_bytes: i32,
698 pub auto_offset_reset: AutoOffsetReset,
700 pub isolation_level: IsolationLevel,
702 pub enable_auto_commit: bool,
704 pub auto_commit_interval: Duration,
706 pub server_assignor: Option<String>,
708 pub rack_id: Option<String>,
710 pub instance_id: Option<String>,
712}
713
714impl ConsumerConfig {
715 pub fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
717 Self {
718 bootstrap_servers: vec![bootstrap_server.into()],
719 client_id: "rust-consumer".to_owned(),
720 group_id: group_id.into(),
721 security_protocol: SecurityProtocol::Plaintext,
722 tls: TlsConfig::default(),
723 sasl: SaslConfig::default(),
724 request_timeout: Duration::from_secs(5),
725 metadata_max_age: Duration::from_secs(30),
726 retry_backoff: Duration::from_millis(250),
727 max_retries: 3,
728 rebalance_timeout: Duration::from_secs(30),
729 fetch_max_wait: Duration::from_millis(500),
730 fetch_min_bytes: 1,
731 fetch_max_bytes: 50 * 1024 * 1024,
732 partition_max_bytes: 1024 * 1024,
733 auto_offset_reset: AutoOffsetReset::Earliest,
734 isolation_level: IsolationLevel::ReadUncommitted,
735 enable_auto_commit: false,
736 auto_commit_interval: Duration::from_secs(5),
737 server_assignor: None,
738 rack_id: None,
739 instance_id: None,
740 }
741 }
742
743 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
745 self.client_id = client_id.into();
746 self
747 }
748
749 pub fn with_bootstrap_servers(
751 mut self,
752 servers: impl IntoIterator<Item = impl Into<String>>,
753 ) -> Self {
754 self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
755 self
756 }
757
758 pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
760 self.security_protocol = security_protocol;
761 self
762 }
763
764 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
766 self.security_protocol = if self.security_protocol.uses_sasl() {
767 SecurityProtocol::SaslSsl
768 } else {
769 SecurityProtocol::Ssl
770 };
771 self.tls = tls;
772 self
773 }
774
775 pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
777 self.security_protocol = if self.security_protocol.uses_sasl() {
778 SecurityProtocol::SaslSsl
779 } else {
780 SecurityProtocol::Ssl
781 };
782 self.tls = self.tls.with_ca_cert_path(path);
783 self
784 }
785
786 pub fn with_tls_client_auth_paths(
788 mut self,
789 cert_path: impl Into<PathBuf>,
790 key_path: impl Into<PathBuf>,
791 ) -> Self {
792 self.security_protocol = if self.security_protocol.uses_sasl() {
793 SecurityProtocol::SaslSsl
794 } else {
795 SecurityProtocol::Ssl
796 };
797 self.tls = self
798 .tls
799 .with_client_cert_path(cert_path)
800 .with_client_key_path(key_path);
801 self
802 }
803
804 pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
806 self.security_protocol = if self.security_protocol.uses_sasl() {
807 SecurityProtocol::SaslSsl
808 } else {
809 SecurityProtocol::Ssl
810 };
811 self.tls = self.tls.with_server_name(server_name);
812 self
813 }
814
815 pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
817 self.security_protocol = self.security_protocol.with_sasl();
818 self.sasl = sasl;
819 self
820 }
821
822 pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
824 self.with_sasl(SaslConfig::plain(username, password))
825 }
826
827 pub fn with_sasl_scram_sha_256(
829 self,
830 username: impl Into<String>,
831 password: impl Into<String>,
832 ) -> Self {
833 self.with_sasl(SaslConfig::scram_sha_256(username, password))
834 }
835
836 pub fn with_sasl_scram_sha_512(
838 self,
839 username: impl Into<String>,
840 password: impl Into<String>,
841 ) -> Self {
842 self.with_sasl(SaslConfig::scram_sha_512(username, password))
843 }
844
845 pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
847 self.request_timeout = request_timeout;
848 self
849 }
850
851 pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
853 self.metadata_max_age = metadata_max_age;
854 self
855 }
856
857 pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
859 self.retry_backoff = retry_backoff;
860 self
861 }
862
863 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
865 self.max_retries = max_retries;
866 self
867 }
868
869 pub fn with_rebalance_timeout(mut self, rebalance_timeout: Duration) -> Self {
871 self.rebalance_timeout = rebalance_timeout;
872 self
873 }
874
875 pub fn with_fetch_max_wait(mut self, fetch_max_wait: Duration) -> Self {
877 self.fetch_max_wait = fetch_max_wait;
878 self
879 }
880
881 pub fn with_fetch_min_bytes(mut self, fetch_min_bytes: i32) -> Self {
883 self.fetch_min_bytes = fetch_min_bytes;
884 self
885 }
886
887 pub fn with_fetch_max_bytes(mut self, fetch_max_bytes: i32) -> Self {
889 self.fetch_max_bytes = fetch_max_bytes;
890 self
891 }
892
893 pub fn with_partition_max_bytes(mut self, partition_max_bytes: i32) -> Self {
895 self.partition_max_bytes = partition_max_bytes;
896 self
897 }
898
899 pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
901 self.auto_offset_reset = auto_offset_reset;
902 self
903 }
904
905 pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
907 self.isolation_level = isolation_level;
908 self
909 }
910
911 pub fn with_enable_auto_commit(mut self, enable_auto_commit: bool) -> Self {
913 self.enable_auto_commit = enable_auto_commit;
914 self
915 }
916
917 pub fn with_auto_commit_interval(mut self, auto_commit_interval: Duration) -> Self {
919 self.auto_commit_interval = auto_commit_interval;
920 self
921 }
922
923 pub fn with_server_assignor(mut self, server_assignor: impl Into<String>) -> Self {
925 self.server_assignor = Some(server_assignor.into());
926 self
927 }
928
929 pub fn with_rack_id(mut self, rack_id: impl Into<String>) -> Self {
931 self.rack_id = Some(rack_id.into());
932 self
933 }
934
935 pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
937 self.instance_id = Some(instance_id.into());
938 self
939 }
940}
941
942#[cfg(test)]
943mod tests {
944 use super::*;
945
946 #[test]
947 fn producer_tls_builder_enables_ssl() {
948 let config =
949 ProducerConfig::new("localhost:9093").with_tls_ca_cert_path("/tmp/cluster-ca.pem");
950 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
951 assert_eq!(
952 config.tls.ca_cert_path.as_deref(),
953 Some(std::path::Path::new("/tmp/cluster-ca.pem"))
954 );
955 }
956
957 #[test]
958 fn producer_sasl_plain_builder_enables_sasl_plaintext() {
959 let config = ProducerConfig::new("localhost:9092").with_sasl_plain("user-a", "secret-a");
960 assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
961 assert_eq!(config.sasl.mechanism, SaslMechanism::Plain);
962 assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
963 assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
964 }
965
966 #[test]
967 fn producer_sasl_scram_builder_enables_sasl_plaintext() {
968 let config =
969 ProducerConfig::new("localhost:9092").with_sasl_scram_sha_256("user-a", "secret-a");
970 assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
971 assert_eq!(config.sasl.mechanism, SaslMechanism::ScramSha256);
972 assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
973 assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
974 }
975
976 #[test]
977 fn producer_tls_and_sasl_builders_enable_sasl_ssl() {
978 let config = ProducerConfig::new("localhost:9093")
979 .with_sasl_plain("user-a", "secret-a")
980 .with_tls_ca_cert_path("/tmp/cluster-ca.pem");
981 assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
982
983 let config = ProducerConfig::new("localhost:9093")
984 .with_tls_ca_cert_path("/tmp/cluster-ca.pem")
985 .with_sasl_plain("user-a", "secret-a");
986 assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
987 }
988
989 #[test]
990 fn producer_multi_broker_with_bootstrap_servers() {
991 let config = ProducerConfig::new("host1:9092").with_bootstrap_servers([
992 "host1:9092",
993 "host2:9092",
994 "host3:9092",
995 ]);
996 assert_eq!(
997 config.bootstrap_servers,
998 vec!["host1:9092", "host2:9092", "host3:9092"]
999 );
1000 }
1001
1002 #[test]
1003 fn producer_builder_records_tuning_and_transaction_options() {
1004 let config = ProducerConfig::new("localhost:9092")
1005 .with_client_id("producer-a")
1006 .with_security_protocol(SecurityProtocol::Ssl)
1007 .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1008 .with_tls_server_name("kafka.internal")
1009 .with_acks(-1)
1010 .with_enable_idempotence(true)
1011 .with_partitioner(ProducerPartitioner::Default)
1012 .with_compression(ProducerCompression::Zstd)
1013 .with_batch_size(0)
1014 .with_linger(Duration::from_millis(50))
1015 .with_delivery_timeout(Duration::from_secs(10))
1016 .with_request_timeout(Duration::from_secs(2))
1017 .with_metadata_max_age(Duration::from_secs(60))
1018 .with_retry_backoff(Duration::from_millis(75))
1019 .with_max_retries(7)
1020 .with_transactional_id("tx-a")
1021 .with_transaction_timeout(Duration::from_secs(45));
1022
1023 assert_eq!(config.client_id, "producer-a");
1024 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1025 assert_eq!(
1026 config.tls.client_cert_path.as_deref(),
1027 Some(std::path::Path::new("/tmp/client.crt"))
1028 );
1029 assert_eq!(
1030 config.tls.client_key_path.as_deref(),
1031 Some(std::path::Path::new("/tmp/client.key"))
1032 );
1033 assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1034 assert_eq!(config.acks, -1);
1035 assert!(config.is_idempotent());
1036 assert_eq!(config.compression, ProducerCompression::Zstd);
1037 assert_eq!(config.batch_size, 1);
1038 assert_eq!(config.linger, Duration::from_millis(50));
1039 assert_eq!(config.delivery_timeout, Duration::from_secs(10));
1040 assert_eq!(config.request_timeout, Duration::from_secs(2));
1041 assert_eq!(config.metadata_max_age, Duration::from_secs(60));
1042 assert_eq!(config.retry_backoff, Duration::from_millis(75));
1043 assert_eq!(config.max_retries, 7);
1044 assert_eq!(config.transactional_id.as_deref(), Some("tx-a"));
1045 assert_eq!(config.transaction_timeout, Duration::from_secs(45));
1046 assert!(config.is_transactional());
1047 }
1048
1049 #[test]
1050 fn consumer_tls_builder_keeps_override_server_name() {
1051 let config = ConsumerConfig::new("localhost:9093", "group-a")
1052 .with_tls(TlsConfig::new().with_server_name("kafka.internal"));
1053 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1054 assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1055 }
1056
1057 #[test]
1058 fn consumer_multi_broker_with_bootstrap_servers() {
1059 let config = ConsumerConfig::new("host1:9092", "group-a")
1060 .with_bootstrap_servers(["host1:9092", "host2:9092"]);
1061 assert_eq!(config.bootstrap_servers, vec!["host1:9092", "host2:9092"]);
1062 }
1063
1064 #[test]
1065 fn admin_tls_builder_records_client_auth_paths() {
1066 let config = AdminConfig::new("localhost:9093")
1067 .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key");
1068 assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1069 assert_eq!(
1070 config.tls.client_cert_path.as_deref(),
1071 Some(std::path::Path::new("/tmp/client.crt"))
1072 );
1073 assert_eq!(
1074 config.tls.client_key_path.as_deref(),
1075 Some(std::path::Path::new("/tmp/client.key"))
1076 );
1077 }
1078
1079 #[test]
1080 fn admin_and_consumer_builders_record_remaining_options() {
1081 let admin = AdminConfig::new("localhost:9092")
1082 .with_client_id("admin-a")
1083 .with_bootstrap_servers(["host-a:9092", "host-b:9092"])
1084 .with_security_protocol(SecurityProtocol::Plaintext)
1085 .with_tls(TlsConfig::new().with_ca_cert_path("/tmp/ca.pem"))
1086 .with_sasl_scram_sha_512("user-a", "secret-a")
1087 .with_request_timeout(Duration::from_secs(8));
1088
1089 assert_eq!(admin.bootstrap_servers, vec!["host-a:9092", "host-b:9092"]);
1090 assert_eq!(admin.client_id, "admin-a");
1091 assert_eq!(admin.security_protocol, SecurityProtocol::SaslSsl);
1092 assert_eq!(
1093 admin.tls.ca_cert_path.as_deref(),
1094 Some(std::path::Path::new("/tmp/ca.pem"))
1095 );
1096 assert_eq!(admin.sasl.mechanism, SaslMechanism::ScramSha512);
1097 assert_eq!(admin.request_timeout, Duration::from_secs(8));
1098
1099 let consumer = ConsumerConfig::new("localhost:9092", "group-a")
1100 .with_client_id("consumer-a")
1101 .with_security_protocol(SecurityProtocol::Ssl)
1102 .with_tls_ca_cert_path("/tmp/ca.pem")
1103 .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1104 .with_sasl_scram_sha_512("user-b", "secret-b")
1105 .with_request_timeout(Duration::from_secs(3))
1106 .with_metadata_max_age(Duration::from_secs(4))
1107 .with_retry_backoff(Duration::from_millis(5))
1108 .with_max_retries(6)
1109 .with_rebalance_timeout(Duration::from_secs(7))
1110 .with_fetch_max_wait(Duration::from_millis(8))
1111 .with_fetch_min_bytes(9)
1112 .with_fetch_max_bytes(10)
1113 .with_partition_max_bytes(11)
1114 .with_auto_offset_reset(AutoOffsetReset::Latest)
1115 .with_isolation_level(IsolationLevel::ReadCommitted)
1116 .with_enable_auto_commit(true)
1117 .with_auto_commit_interval(Duration::from_secs(12))
1118 .with_server_assignor("range")
1119 .with_rack_id("rack-a")
1120 .with_instance_id("instance-a");
1121
1122 assert_eq!(consumer.client_id, "consumer-a");
1123 assert_eq!(consumer.security_protocol, SecurityProtocol::SaslSsl);
1124 assert_eq!(consumer.sasl.mechanism, SaslMechanism::ScramSha512);
1125 assert_eq!(consumer.request_timeout, Duration::from_secs(3));
1126 assert_eq!(consumer.metadata_max_age, Duration::from_secs(4));
1127 assert_eq!(consumer.retry_backoff, Duration::from_millis(5));
1128 assert_eq!(consumer.max_retries, 6);
1129 assert_eq!(consumer.rebalance_timeout, Duration::from_secs(7));
1130 assert_eq!(consumer.fetch_max_wait, Duration::from_millis(8));
1131 assert_eq!(consumer.fetch_min_bytes, 9);
1132 assert_eq!(consumer.fetch_max_bytes, 10);
1133 assert_eq!(consumer.partition_max_bytes, 11);
1134 assert_eq!(consumer.auto_offset_reset, AutoOffsetReset::Latest);
1135 assert_eq!(consumer.isolation_level, IsolationLevel::ReadCommitted);
1136 assert!(consumer.enable_auto_commit);
1137 assert_eq!(consumer.auto_commit_interval, Duration::from_secs(12));
1138 assert_eq!(consumer.server_assignor.as_deref(), Some("range"));
1139 assert_eq!(consumer.rack_id.as_deref(), Some("rack-a"));
1140 assert_eq!(consumer.instance_id.as_deref(), Some("instance-a"));
1141 }
1142}