Skip to main content

kafkit_client/core/
config.rs

1//! Configuration types used to build clients.
2//!
3//! The direct config structs are useful when you want full control. For day to
4//! day code, [`KafkaClient`](crate::KafkaClient) and its builders are a little
5//! more convenient.
6//!
7//! ```no_run
8//! use std::time::Duration;
9//! use kafkit_client::{ProducerConfig, SecurityProtocol};
10//!
11//! let config = ProducerConfig::new("localhost:9092")
12//!     .with_client_id("orders-writer")
13//!     .with_security_protocol(SecurityProtocol::Plaintext)
14//!     .with_linger(Duration::from_millis(5));
15//! ```
16//!
17use std::path::PathBuf;
18use std::time::Duration;
19
20use kafka_protocol::records::Compression;
21
22use crate::constants::{READ_COMMITTED, READ_UNCOMMITTED};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25/// Security Protocol.
26pub enum SecurityProtocol {
27    /// Plaintext.
28    Plaintext,
29    /// Ssl.
30    Ssl,
31    /// Sasl plaintext.
32    SaslPlaintext,
33    /// Sasl ssl.
34    SaslSsl,
35}
36
37impl SecurityProtocol {
38    /// Returns whether tls.
39    pub fn uses_tls(self) -> bool {
40        matches!(self, Self::Ssl | Self::SaslSsl)
41    }
42
43    /// Returns whether sasl.
44    pub fn uses_sasl(self) -> bool {
45        matches!(self, Self::SaslPlaintext | Self::SaslSsl)
46    }
47
48    fn with_sasl(self) -> Self {
49        if self.uses_tls() {
50            Self::SaslSsl
51        } else {
52            Self::SaslPlaintext
53        }
54    }
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
58/// Sasl Mechanism.
59pub enum SaslMechanism {
60    #[default]
61    /// Plain.
62    Plain,
63    /// Scram sha256.
64    ScramSha256,
65    /// Scram sha512.
66    ScramSha512,
67}
68
69impl SaslMechanism {
70    /// Returns the Kafka-style name for this value.
71    pub fn as_str(self) -> &'static str {
72        match self {
73            Self::Plain => "PLAIN",
74            Self::ScramSha256 => "SCRAM-SHA-256",
75            Self::ScramSha512 => "SCRAM-SHA-512",
76        }
77    }
78
79    /// Returns whether scram.
80    pub fn is_scram(self) -> bool {
81        matches!(self, Self::ScramSha256 | Self::ScramSha512)
82    }
83
84    /// Scram Type.
85    pub fn scram_type(self) -> Option<i8> {
86        match self {
87            Self::Plain => None,
88            Self::ScramSha256 => Some(1),
89            Self::ScramSha512 => Some(2),
90        }
91    }
92}
93
94#[derive(Debug, Clone, Default, PartialEq, Eq)]
95/// Sasl Config.
96pub struct SaslConfig {
97    /// Mechanism.
98    pub mechanism: SaslMechanism,
99    /// Username.
100    pub username: Option<String>,
101    /// Password.
102    pub password: Option<String>,
103    /// Authorization Id.
104    pub authorization_id: Option<String>,
105}
106
107impl SaslConfig {
108    /// Plain.
109    pub fn plain(username: impl Into<String>, password: impl Into<String>) -> Self {
110        Self {
111            mechanism: SaslMechanism::Plain,
112            username: Some(username.into()),
113            password: Some(password.into()),
114            authorization_id: None,
115        }
116    }
117
118    /// Scram Sha 256.
119    pub fn scram_sha_256(username: impl Into<String>, password: impl Into<String>) -> Self {
120        Self {
121            mechanism: SaslMechanism::ScramSha256,
122            username: Some(username.into()),
123            password: Some(password.into()),
124            authorization_id: None,
125        }
126    }
127
128    /// Scram Sha 512.
129    pub fn scram_sha_512(username: impl Into<String>, password: impl Into<String>) -> Self {
130        Self {
131            mechanism: SaslMechanism::ScramSha512,
132            username: Some(username.into()),
133            password: Some(password.into()),
134            authorization_id: None,
135        }
136    }
137
138    /// Sets authorization id and returns the updated value.
139    pub fn with_authorization_id(mut self, authorization_id: impl Into<String>) -> Self {
140        self.authorization_id = Some(authorization_id.into());
141        self
142    }
143}
144
145#[derive(Debug, Clone, Default, PartialEq, Eq)]
146/// Tls Config.
147pub struct TlsConfig {
148    /// Ca Cert Path.
149    pub ca_cert_path: Option<PathBuf>,
150    /// Client Cert Path.
151    pub client_cert_path: Option<PathBuf>,
152    /// Client Key Path.
153    pub client_key_path: Option<PathBuf>,
154    /// Server Name.
155    pub server_name: Option<String>,
156}
157
158impl TlsConfig {
159    /// Creates a new value.
160    pub fn new() -> Self {
161        Self::default()
162    }
163
164    /// Sets ca cert path and returns the updated value.
165    pub fn with_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
166        self.ca_cert_path = Some(path.into());
167        self
168    }
169
170    /// Sets client cert path and returns the updated value.
171    pub fn with_client_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
172        self.client_cert_path = Some(path.into());
173        self
174    }
175
176    /// Sets client key path and returns the updated value.
177    pub fn with_client_key_path(mut self, path: impl Into<PathBuf>) -> Self {
178        self.client_key_path = Some(path.into());
179        self
180    }
181
182    /// Sets server name and returns the updated value.
183    pub fn with_server_name(mut self, server_name: impl Into<String>) -> Self {
184        self.server_name = Some(server_name.into());
185        self
186    }
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190/// Producer Partitioner.
191pub enum ProducerPartitioner {
192    /// Default.
193    Default,
194}
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197/// Producer Compression.
198pub enum ProducerCompression {
199    /// None.
200    None,
201    /// Gzip.
202    Gzip,
203    /// Snappy.
204    Snappy,
205    /// Lz4.
206    Lz4,
207    /// Zstd.
208    Zstd,
209}
210
211impl From<ProducerCompression> for Compression {
212    fn from(value: ProducerCompression) -> Self {
213        match value {
214            ProducerCompression::None => Compression::None,
215            ProducerCompression::Gzip => Compression::Gzip,
216            ProducerCompression::Snappy => Compression::Snappy,
217            ProducerCompression::Lz4 => Compression::Lz4,
218            ProducerCompression::Zstd => Compression::Zstd,
219        }
220    }
221}
222
223#[derive(Debug, Clone)]
224/// Producer Config.
225pub struct ProducerConfig {
226    /// Bootstrap Servers.
227    pub bootstrap_servers: Vec<String>,
228    /// Client Id.
229    pub client_id: String,
230    /// Security Protocol.
231    pub security_protocol: SecurityProtocol,
232    /// Tls.
233    pub tls: TlsConfig,
234    /// Sasl.
235    pub sasl: SaslConfig,
236    /// Acks.
237    pub acks: i16,
238    /// Enable Idempotence.
239    pub enable_idempotence: bool,
240    /// Partitioner.
241    pub partitioner: ProducerPartitioner,
242    /// Compression.
243    pub compression: ProducerCompression,
244    /// Batch Size.
245    pub batch_size: usize,
246    /// Linger.
247    pub linger: Duration,
248    /// Delivery Timeout.
249    pub delivery_timeout: Duration,
250    /// Request Timeout.
251    pub request_timeout: Duration,
252    /// Metadata Max Age.
253    pub metadata_max_age: Duration,
254    /// Retry Backoff.
255    pub retry_backoff: Duration,
256    /// Max Retries.
257    pub max_retries: usize,
258    /// Max In Flight Requests Per Connection.
259    pub max_in_flight_requests_per_connection: usize,
260    /// Transactional Id.
261    pub transactional_id: Option<String>,
262    /// Transaction Timeout.
263    pub transaction_timeout: Duration,
264}
265
266impl ProducerConfig {
267    /// Creates a new value.
268    pub fn new(bootstrap_server: impl Into<String>) -> Self {
269        Self {
270            bootstrap_servers: vec![bootstrap_server.into()],
271            client_id: "rust-producer".to_owned(),
272            security_protocol: SecurityProtocol::Plaintext,
273            tls: TlsConfig::default(),
274            sasl: SaslConfig::default(),
275            acks: 1,
276            enable_idempotence: false,
277            partitioner: ProducerPartitioner::Default,
278            compression: ProducerCompression::None,
279            batch_size: 16 * 1024,
280            linger: Duration::ZERO,
281            delivery_timeout: Duration::from_secs(120),
282            request_timeout: Duration::from_secs(5),
283            metadata_max_age: Duration::from_secs(30),
284            retry_backoff: Duration::from_millis(250),
285            max_retries: 3,
286            max_in_flight_requests_per_connection: 5,
287            transactional_id: None,
288            transaction_timeout: Duration::from_secs(30),
289        }
290    }
291
292    /// Sets client id and returns the updated value.
293    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
294        self.client_id = client_id.into();
295        self
296    }
297
298    /// Sets bootstrap servers and returns the updated value.
299    pub fn with_bootstrap_servers(
300        mut self,
301        servers: impl IntoIterator<Item = impl Into<String>>,
302    ) -> Self {
303        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
304        self
305    }
306
307    /// Sets security protocol and returns the updated value.
308    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
309        self.security_protocol = security_protocol;
310        self
311    }
312
313    /// Sets tls and returns the updated value.
314    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
315        self.security_protocol = if self.security_protocol.uses_sasl() {
316            SecurityProtocol::SaslSsl
317        } else {
318            SecurityProtocol::Ssl
319        };
320        self.tls = tls;
321        self
322    }
323
324    /// Sets tls ca cert path and returns the updated value.
325    pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
326        self.security_protocol = if self.security_protocol.uses_sasl() {
327            SecurityProtocol::SaslSsl
328        } else {
329            SecurityProtocol::Ssl
330        };
331        self.tls = self.tls.with_ca_cert_path(path);
332        self
333    }
334
335    /// Sets tls client auth paths and returns the updated value.
336    pub fn with_tls_client_auth_paths(
337        mut self,
338        cert_path: impl Into<PathBuf>,
339        key_path: impl Into<PathBuf>,
340    ) -> Self {
341        self.security_protocol = if self.security_protocol.uses_sasl() {
342            SecurityProtocol::SaslSsl
343        } else {
344            SecurityProtocol::Ssl
345        };
346        self.tls = self
347            .tls
348            .with_client_cert_path(cert_path)
349            .with_client_key_path(key_path);
350        self
351    }
352
353    /// Sets tls server name and returns the updated value.
354    pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
355        self.security_protocol = if self.security_protocol.uses_sasl() {
356            SecurityProtocol::SaslSsl
357        } else {
358            SecurityProtocol::Ssl
359        };
360        self.tls = self.tls.with_server_name(server_name);
361        self
362    }
363
364    /// Sets sasl and returns the updated value.
365    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
366        self.security_protocol = self.security_protocol.with_sasl();
367        self.sasl = sasl;
368        self
369    }
370
371    /// Sets sasl plain and returns the updated value.
372    pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
373        self.with_sasl(SaslConfig::plain(username, password))
374    }
375
376    /// Sets sasl scram sha 256 and returns the updated value.
377    pub fn with_sasl_scram_sha_256(
378        self,
379        username: impl Into<String>,
380        password: impl Into<String>,
381    ) -> Self {
382        self.with_sasl(SaslConfig::scram_sha_256(username, password))
383    }
384
385    /// Sets sasl scram sha 512 and returns the updated value.
386    pub fn with_sasl_scram_sha_512(
387        self,
388        username: impl Into<String>,
389        password: impl Into<String>,
390    ) -> Self {
391        self.with_sasl(SaslConfig::scram_sha_512(username, password))
392    }
393
394    /// Sets acks and returns the updated value.
395    pub fn with_acks(mut self, acks: i16) -> Self {
396        self.acks = acks;
397        self
398    }
399
400    /// Sets enable idempotence and returns the updated value.
401    pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
402        self.enable_idempotence = enable_idempotence;
403        if enable_idempotence {
404            self.acks = -1;
405            self.max_retries = self.max_retries.max(1);
406        }
407        self
408    }
409
410    /// Sets partitioner and returns the updated value.
411    pub fn with_partitioner(mut self, partitioner: ProducerPartitioner) -> Self {
412        self.partitioner = partitioner;
413        self
414    }
415
416    /// Sets compression and returns the updated value.
417    pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
418        self.compression = compression;
419        self
420    }
421
422    /// Sets batch size and returns the updated value.
423    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
424        self.batch_size = batch_size.max(1);
425        self
426    }
427
428    /// Sets linger and returns the updated value.
429    pub fn with_linger(mut self, linger: Duration) -> Self {
430        self.linger = linger;
431        self
432    }
433
434    /// Sets delivery timeout and returns the updated value.
435    pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
436        self.delivery_timeout = delivery_timeout;
437        self
438    }
439
440    /// Sets request timeout and returns the updated value.
441    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
442        self.request_timeout = request_timeout;
443        self
444    }
445
446    /// Sets metadata max age and returns the updated value.
447    pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
448        self.metadata_max_age = metadata_max_age;
449        self
450    }
451
452    /// Sets retry backoff and returns the updated value.
453    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
454        self.retry_backoff = retry_backoff;
455        self
456    }
457
458    /// Sets max retries and returns the updated value.
459    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
460        self.max_retries = max_retries;
461        self
462    }
463
464    /// Sets max in-flight requests per broker connection and returns the updated value.
465    pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
466        self.max_in_flight_requests_per_connection = max_in_flight.max(1);
467        self
468    }
469
470    /// Sets transactional id and returns the updated value.
471    pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
472        self.transactional_id = Some(transactional_id.into());
473        self.acks = -1;
474        self.enable_idempotence = true;
475        self
476    }
477
478    /// Sets transaction timeout and returns the updated value.
479    pub fn with_transaction_timeout(mut self, transaction_timeout: Duration) -> Self {
480        self.transaction_timeout = transaction_timeout;
481        self
482    }
483
484    /// Returns whether transactional.
485    pub fn is_transactional(&self) -> bool {
486        self.transactional_id.is_some()
487    }
488
489    /// Returns whether idempotent.
490    pub fn is_idempotent(&self) -> bool {
491        self.enable_idempotence
492    }
493}
494
495/// Alias for client config.
496pub type ClientConfig = ProducerConfig;
497
498#[derive(Debug, Clone)]
499/// Admin Config.
500pub struct AdminConfig {
501    /// Bootstrap Servers.
502    pub bootstrap_servers: Vec<String>,
503    /// Client Id.
504    pub client_id: String,
505    /// Security Protocol.
506    pub security_protocol: SecurityProtocol,
507    /// Tls.
508    pub tls: TlsConfig,
509    /// Sasl.
510    pub sasl: SaslConfig,
511    /// Request Timeout.
512    pub request_timeout: Duration,
513}
514
515impl AdminConfig {
516    /// Creates a new value.
517    pub fn new(bootstrap_server: impl Into<String>) -> Self {
518        Self {
519            bootstrap_servers: vec![bootstrap_server.into()],
520            client_id: "rust-admin".to_owned(),
521            security_protocol: SecurityProtocol::Plaintext,
522            tls: TlsConfig::default(),
523            sasl: SaslConfig::default(),
524            request_timeout: Duration::from_secs(5),
525        }
526    }
527
528    /// Sets client id and returns the updated value.
529    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
530        self.client_id = client_id.into();
531        self
532    }
533
534    /// Sets bootstrap servers and returns the updated value.
535    pub fn with_bootstrap_servers(
536        mut self,
537        servers: impl IntoIterator<Item = impl Into<String>>,
538    ) -> Self {
539        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
540        self
541    }
542
543    /// Sets security protocol and returns the updated value.
544    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
545        self.security_protocol = security_protocol;
546        self
547    }
548
549    /// Sets tls and returns the updated value.
550    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
551        self.security_protocol = if self.security_protocol.uses_sasl() {
552            SecurityProtocol::SaslSsl
553        } else {
554            SecurityProtocol::Ssl
555        };
556        self.tls = tls;
557        self
558    }
559
560    /// Sets tls ca cert path and returns the updated value.
561    pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
562        self.security_protocol = if self.security_protocol.uses_sasl() {
563            SecurityProtocol::SaslSsl
564        } else {
565            SecurityProtocol::Ssl
566        };
567        self.tls = self.tls.with_ca_cert_path(path);
568        self
569    }
570
571    /// Sets tls client auth paths and returns the updated value.
572    pub fn with_tls_client_auth_paths(
573        mut self,
574        cert_path: impl Into<PathBuf>,
575        key_path: impl Into<PathBuf>,
576    ) -> Self {
577        self.security_protocol = if self.security_protocol.uses_sasl() {
578            SecurityProtocol::SaslSsl
579        } else {
580            SecurityProtocol::Ssl
581        };
582        self.tls = self
583            .tls
584            .with_client_cert_path(cert_path)
585            .with_client_key_path(key_path);
586        self
587    }
588
589    /// Sets tls server name and returns the updated value.
590    pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
591        self.security_protocol = if self.security_protocol.uses_sasl() {
592            SecurityProtocol::SaslSsl
593        } else {
594            SecurityProtocol::Ssl
595        };
596        self.tls = self.tls.with_server_name(server_name);
597        self
598    }
599
600    /// Sets sasl and returns the updated value.
601    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
602        self.security_protocol = self.security_protocol.with_sasl();
603        self.sasl = sasl;
604        self
605    }
606
607    /// Sets sasl plain and returns the updated value.
608    pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
609        self.with_sasl(SaslConfig::plain(username, password))
610    }
611
612    /// Sets sasl scram sha 256 and returns the updated value.
613    pub fn with_sasl_scram_sha_256(
614        self,
615        username: impl Into<String>,
616        password: impl Into<String>,
617    ) -> Self {
618        self.with_sasl(SaslConfig::scram_sha_256(username, password))
619    }
620
621    /// Sets sasl scram sha 512 and returns the updated value.
622    pub fn with_sasl_scram_sha_512(
623        self,
624        username: impl Into<String>,
625        password: impl Into<String>,
626    ) -> Self {
627        self.with_sasl(SaslConfig::scram_sha_512(username, password))
628    }
629
630    /// Sets request timeout and returns the updated value.
631    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
632        self.request_timeout = request_timeout;
633        self
634    }
635}
636
637#[derive(Debug, Clone, Copy, PartialEq, Eq)]
638/// Auto Offset Reset.
639pub enum AutoOffsetReset {
640    /// Earliest.
641    Earliest,
642    /// Latest.
643    Latest,
644}
645
646#[derive(Debug, Clone, Copy, PartialEq, Eq)]
647/// Isolation Level.
648pub enum IsolationLevel {
649    /// Read uncommitted.
650    ReadUncommitted,
651    /// Read committed.
652    ReadCommitted,
653}
654
655impl IsolationLevel {
656    /// As Protocol Value.
657    pub fn as_protocol_value(self) -> i8 {
658        match self {
659            Self::ReadUncommitted => READ_UNCOMMITTED,
660            Self::ReadCommitted => READ_COMMITTED,
661        }
662    }
663}
664
665#[derive(Debug, Clone)]
666/// Consumer Config.
667pub struct ConsumerConfig {
668    /// Bootstrap Servers.
669    pub bootstrap_servers: Vec<String>,
670    /// Client Id.
671    pub client_id: String,
672    /// Group Id.
673    pub group_id: String,
674    /// Security Protocol.
675    pub security_protocol: SecurityProtocol,
676    /// Tls.
677    pub tls: TlsConfig,
678    /// Sasl.
679    pub sasl: SaslConfig,
680    /// Request Timeout.
681    pub request_timeout: Duration,
682    /// Metadata Max Age.
683    pub metadata_max_age: Duration,
684    /// Retry Backoff.
685    pub retry_backoff: Duration,
686    /// Max Retries.
687    pub max_retries: usize,
688    /// Rebalance Timeout.
689    pub rebalance_timeout: Duration,
690    /// Fetch Max Wait.
691    pub fetch_max_wait: Duration,
692    /// Fetch Min Bytes.
693    pub fetch_min_bytes: i32,
694    /// Fetch Max Bytes.
695    pub fetch_max_bytes: i32,
696    /// Partition Max Bytes.
697    pub partition_max_bytes: i32,
698    /// Auto Offset Reset.
699    pub auto_offset_reset: AutoOffsetReset,
700    /// Isolation Level.
701    pub isolation_level: IsolationLevel,
702    /// Enable Auto Commit.
703    pub enable_auto_commit: bool,
704    /// Auto Commit Interval.
705    pub auto_commit_interval: Duration,
706    /// Server Assignor.
707    pub server_assignor: Option<String>,
708    /// Rack Id.
709    pub rack_id: Option<String>,
710    /// Instance Id.
711    pub instance_id: Option<String>,
712}
713
714impl ConsumerConfig {
715    /// Creates a new value.
716    pub fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
717        Self {
718            bootstrap_servers: vec![bootstrap_server.into()],
719            client_id: "rust-consumer".to_owned(),
720            group_id: group_id.into(),
721            security_protocol: SecurityProtocol::Plaintext,
722            tls: TlsConfig::default(),
723            sasl: SaslConfig::default(),
724            request_timeout: Duration::from_secs(5),
725            metadata_max_age: Duration::from_secs(30),
726            retry_backoff: Duration::from_millis(250),
727            max_retries: 3,
728            rebalance_timeout: Duration::from_secs(30),
729            fetch_max_wait: Duration::from_millis(500),
730            fetch_min_bytes: 1,
731            fetch_max_bytes: 50 * 1024 * 1024,
732            partition_max_bytes: 1024 * 1024,
733            auto_offset_reset: AutoOffsetReset::Earliest,
734            isolation_level: IsolationLevel::ReadUncommitted,
735            enable_auto_commit: false,
736            auto_commit_interval: Duration::from_secs(5),
737            server_assignor: None,
738            rack_id: None,
739            instance_id: None,
740        }
741    }
742
743    /// Sets client id and returns the updated value.
744    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
745        self.client_id = client_id.into();
746        self
747    }
748
749    /// Sets bootstrap servers and returns the updated value.
750    pub fn with_bootstrap_servers(
751        mut self,
752        servers: impl IntoIterator<Item = impl Into<String>>,
753    ) -> Self {
754        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
755        self
756    }
757
758    /// Sets security protocol and returns the updated value.
759    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
760        self.security_protocol = security_protocol;
761        self
762    }
763
764    /// Sets tls and returns the updated value.
765    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
766        self.security_protocol = if self.security_protocol.uses_sasl() {
767            SecurityProtocol::SaslSsl
768        } else {
769            SecurityProtocol::Ssl
770        };
771        self.tls = tls;
772        self
773    }
774
775    /// Sets tls ca cert path and returns the updated value.
776    pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
777        self.security_protocol = if self.security_protocol.uses_sasl() {
778            SecurityProtocol::SaslSsl
779        } else {
780            SecurityProtocol::Ssl
781        };
782        self.tls = self.tls.with_ca_cert_path(path);
783        self
784    }
785
786    /// Sets tls client auth paths and returns the updated value.
787    pub fn with_tls_client_auth_paths(
788        mut self,
789        cert_path: impl Into<PathBuf>,
790        key_path: impl Into<PathBuf>,
791    ) -> Self {
792        self.security_protocol = if self.security_protocol.uses_sasl() {
793            SecurityProtocol::SaslSsl
794        } else {
795            SecurityProtocol::Ssl
796        };
797        self.tls = self
798            .tls
799            .with_client_cert_path(cert_path)
800            .with_client_key_path(key_path);
801        self
802    }
803
804    /// Sets tls server name and returns the updated value.
805    pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
806        self.security_protocol = if self.security_protocol.uses_sasl() {
807            SecurityProtocol::SaslSsl
808        } else {
809            SecurityProtocol::Ssl
810        };
811        self.tls = self.tls.with_server_name(server_name);
812        self
813    }
814
815    /// Sets sasl and returns the updated value.
816    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
817        self.security_protocol = self.security_protocol.with_sasl();
818        self.sasl = sasl;
819        self
820    }
821
822    /// Sets sasl plain and returns the updated value.
823    pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
824        self.with_sasl(SaslConfig::plain(username, password))
825    }
826
827    /// Sets sasl scram sha 256 and returns the updated value.
828    pub fn with_sasl_scram_sha_256(
829        self,
830        username: impl Into<String>,
831        password: impl Into<String>,
832    ) -> Self {
833        self.with_sasl(SaslConfig::scram_sha_256(username, password))
834    }
835
836    /// Sets sasl scram sha 512 and returns the updated value.
837    pub fn with_sasl_scram_sha_512(
838        self,
839        username: impl Into<String>,
840        password: impl Into<String>,
841    ) -> Self {
842        self.with_sasl(SaslConfig::scram_sha_512(username, password))
843    }
844
845    /// Sets request timeout and returns the updated value.
846    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
847        self.request_timeout = request_timeout;
848        self
849    }
850
851    /// Sets metadata max age and returns the updated value.
852    pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
853        self.metadata_max_age = metadata_max_age;
854        self
855    }
856
857    /// Sets retry backoff and returns the updated value.
858    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
859        self.retry_backoff = retry_backoff;
860        self
861    }
862
863    /// Sets max retries and returns the updated value.
864    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
865        self.max_retries = max_retries;
866        self
867    }
868
869    /// Sets rebalance timeout and returns the updated value.
870    pub fn with_rebalance_timeout(mut self, rebalance_timeout: Duration) -> Self {
871        self.rebalance_timeout = rebalance_timeout;
872        self
873    }
874
875    /// Sets fetch max wait and returns the updated value.
876    pub fn with_fetch_max_wait(mut self, fetch_max_wait: Duration) -> Self {
877        self.fetch_max_wait = fetch_max_wait;
878        self
879    }
880
881    /// Sets fetch min bytes and returns the updated value.
882    pub fn with_fetch_min_bytes(mut self, fetch_min_bytes: i32) -> Self {
883        self.fetch_min_bytes = fetch_min_bytes;
884        self
885    }
886
887    /// Sets fetch max bytes and returns the updated value.
888    pub fn with_fetch_max_bytes(mut self, fetch_max_bytes: i32) -> Self {
889        self.fetch_max_bytes = fetch_max_bytes;
890        self
891    }
892
893    /// Sets partition max bytes and returns the updated value.
894    pub fn with_partition_max_bytes(mut self, partition_max_bytes: i32) -> Self {
895        self.partition_max_bytes = partition_max_bytes;
896        self
897    }
898
899    /// Sets auto offset reset and returns the updated value.
900    pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
901        self.auto_offset_reset = auto_offset_reset;
902        self
903    }
904
905    /// Sets isolation level and returns the updated value.
906    pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
907        self.isolation_level = isolation_level;
908        self
909    }
910
911    /// Sets enable auto commit and returns the updated value.
912    pub fn with_enable_auto_commit(mut self, enable_auto_commit: bool) -> Self {
913        self.enable_auto_commit = enable_auto_commit;
914        self
915    }
916
917    /// Sets auto commit interval and returns the updated value.
918    pub fn with_auto_commit_interval(mut self, auto_commit_interval: Duration) -> Self {
919        self.auto_commit_interval = auto_commit_interval;
920        self
921    }
922
923    /// Sets server assignor and returns the updated value.
924    pub fn with_server_assignor(mut self, server_assignor: impl Into<String>) -> Self {
925        self.server_assignor = Some(server_assignor.into());
926        self
927    }
928
929    /// Sets rack id and returns the updated value.
930    pub fn with_rack_id(mut self, rack_id: impl Into<String>) -> Self {
931        self.rack_id = Some(rack_id.into());
932        self
933    }
934
935    /// Sets instance id and returns the updated value.
936    pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
937        self.instance_id = Some(instance_id.into());
938        self
939    }
940}
941
942#[cfg(test)]
943mod tests {
944    use super::*;
945
946    #[test]
947    fn producer_tls_builder_enables_ssl() {
948        let config =
949            ProducerConfig::new("localhost:9093").with_tls_ca_cert_path("/tmp/cluster-ca.pem");
950        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
951        assert_eq!(
952            config.tls.ca_cert_path.as_deref(),
953            Some(std::path::Path::new("/tmp/cluster-ca.pem"))
954        );
955    }
956
957    #[test]
958    fn producer_sasl_plain_builder_enables_sasl_plaintext() {
959        let config = ProducerConfig::new("localhost:9092").with_sasl_plain("user-a", "secret-a");
960        assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
961        assert_eq!(config.sasl.mechanism, SaslMechanism::Plain);
962        assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
963        assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
964    }
965
966    #[test]
967    fn producer_sasl_scram_builder_enables_sasl_plaintext() {
968        let config =
969            ProducerConfig::new("localhost:9092").with_sasl_scram_sha_256("user-a", "secret-a");
970        assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
971        assert_eq!(config.sasl.mechanism, SaslMechanism::ScramSha256);
972        assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
973        assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
974    }
975
976    #[test]
977    fn producer_tls_and_sasl_builders_enable_sasl_ssl() {
978        let config = ProducerConfig::new("localhost:9093")
979            .with_sasl_plain("user-a", "secret-a")
980            .with_tls_ca_cert_path("/tmp/cluster-ca.pem");
981        assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
982
983        let config = ProducerConfig::new("localhost:9093")
984            .with_tls_ca_cert_path("/tmp/cluster-ca.pem")
985            .with_sasl_plain("user-a", "secret-a");
986        assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
987    }
988
989    #[test]
990    fn producer_multi_broker_with_bootstrap_servers() {
991        let config = ProducerConfig::new("host1:9092").with_bootstrap_servers([
992            "host1:9092",
993            "host2:9092",
994            "host3:9092",
995        ]);
996        assert_eq!(
997            config.bootstrap_servers,
998            vec!["host1:9092", "host2:9092", "host3:9092"]
999        );
1000    }
1001
1002    #[test]
1003    fn producer_builder_records_tuning_and_transaction_options() {
1004        let config = ProducerConfig::new("localhost:9092")
1005            .with_client_id("producer-a")
1006            .with_security_protocol(SecurityProtocol::Ssl)
1007            .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1008            .with_tls_server_name("kafka.internal")
1009            .with_acks(-1)
1010            .with_enable_idempotence(true)
1011            .with_partitioner(ProducerPartitioner::Default)
1012            .with_compression(ProducerCompression::Zstd)
1013            .with_batch_size(0)
1014            .with_linger(Duration::from_millis(50))
1015            .with_delivery_timeout(Duration::from_secs(10))
1016            .with_request_timeout(Duration::from_secs(2))
1017            .with_metadata_max_age(Duration::from_secs(60))
1018            .with_retry_backoff(Duration::from_millis(75))
1019            .with_max_retries(7)
1020            .with_transactional_id("tx-a")
1021            .with_transaction_timeout(Duration::from_secs(45));
1022
1023        assert_eq!(config.client_id, "producer-a");
1024        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1025        assert_eq!(
1026            config.tls.client_cert_path.as_deref(),
1027            Some(std::path::Path::new("/tmp/client.crt"))
1028        );
1029        assert_eq!(
1030            config.tls.client_key_path.as_deref(),
1031            Some(std::path::Path::new("/tmp/client.key"))
1032        );
1033        assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1034        assert_eq!(config.acks, -1);
1035        assert!(config.is_idempotent());
1036        assert_eq!(config.compression, ProducerCompression::Zstd);
1037        assert_eq!(config.batch_size, 1);
1038        assert_eq!(config.linger, Duration::from_millis(50));
1039        assert_eq!(config.delivery_timeout, Duration::from_secs(10));
1040        assert_eq!(config.request_timeout, Duration::from_secs(2));
1041        assert_eq!(config.metadata_max_age, Duration::from_secs(60));
1042        assert_eq!(config.retry_backoff, Duration::from_millis(75));
1043        assert_eq!(config.max_retries, 7);
1044        assert_eq!(config.transactional_id.as_deref(), Some("tx-a"));
1045        assert_eq!(config.transaction_timeout, Duration::from_secs(45));
1046        assert!(config.is_transactional());
1047    }
1048
1049    #[test]
1050    fn consumer_tls_builder_keeps_override_server_name() {
1051        let config = ConsumerConfig::new("localhost:9093", "group-a")
1052            .with_tls(TlsConfig::new().with_server_name("kafka.internal"));
1053        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1054        assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1055    }
1056
1057    #[test]
1058    fn consumer_multi_broker_with_bootstrap_servers() {
1059        let config = ConsumerConfig::new("host1:9092", "group-a")
1060            .with_bootstrap_servers(["host1:9092", "host2:9092"]);
1061        assert_eq!(config.bootstrap_servers, vec!["host1:9092", "host2:9092"]);
1062    }
1063
1064    #[test]
1065    fn admin_tls_builder_records_client_auth_paths() {
1066        let config = AdminConfig::new("localhost:9093")
1067            .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key");
1068        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1069        assert_eq!(
1070            config.tls.client_cert_path.as_deref(),
1071            Some(std::path::Path::new("/tmp/client.crt"))
1072        );
1073        assert_eq!(
1074            config.tls.client_key_path.as_deref(),
1075            Some(std::path::Path::new("/tmp/client.key"))
1076        );
1077    }
1078
1079    #[test]
1080    fn admin_and_consumer_builders_record_remaining_options() {
1081        let admin = AdminConfig::new("localhost:9092")
1082            .with_client_id("admin-a")
1083            .with_bootstrap_servers(["host-a:9092", "host-b:9092"])
1084            .with_security_protocol(SecurityProtocol::Plaintext)
1085            .with_tls(TlsConfig::new().with_ca_cert_path("/tmp/ca.pem"))
1086            .with_sasl_scram_sha_512("user-a", "secret-a")
1087            .with_request_timeout(Duration::from_secs(8));
1088
1089        assert_eq!(admin.bootstrap_servers, vec!["host-a:9092", "host-b:9092"]);
1090        assert_eq!(admin.client_id, "admin-a");
1091        assert_eq!(admin.security_protocol, SecurityProtocol::SaslSsl);
1092        assert_eq!(
1093            admin.tls.ca_cert_path.as_deref(),
1094            Some(std::path::Path::new("/tmp/ca.pem"))
1095        );
1096        assert_eq!(admin.sasl.mechanism, SaslMechanism::ScramSha512);
1097        assert_eq!(admin.request_timeout, Duration::from_secs(8));
1098
1099        let consumer = ConsumerConfig::new("localhost:9092", "group-a")
1100            .with_client_id("consumer-a")
1101            .with_security_protocol(SecurityProtocol::Ssl)
1102            .with_tls_ca_cert_path("/tmp/ca.pem")
1103            .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1104            .with_sasl_scram_sha_512("user-b", "secret-b")
1105            .with_request_timeout(Duration::from_secs(3))
1106            .with_metadata_max_age(Duration::from_secs(4))
1107            .with_retry_backoff(Duration::from_millis(5))
1108            .with_max_retries(6)
1109            .with_rebalance_timeout(Duration::from_secs(7))
1110            .with_fetch_max_wait(Duration::from_millis(8))
1111            .with_fetch_min_bytes(9)
1112            .with_fetch_max_bytes(10)
1113            .with_partition_max_bytes(11)
1114            .with_auto_offset_reset(AutoOffsetReset::Latest)
1115            .with_isolation_level(IsolationLevel::ReadCommitted)
1116            .with_enable_auto_commit(true)
1117            .with_auto_commit_interval(Duration::from_secs(12))
1118            .with_server_assignor("range")
1119            .with_rack_id("rack-a")
1120            .with_instance_id("instance-a");
1121
1122        assert_eq!(consumer.client_id, "consumer-a");
1123        assert_eq!(consumer.security_protocol, SecurityProtocol::SaslSsl);
1124        assert_eq!(consumer.sasl.mechanism, SaslMechanism::ScramSha512);
1125        assert_eq!(consumer.request_timeout, Duration::from_secs(3));
1126        assert_eq!(consumer.metadata_max_age, Duration::from_secs(4));
1127        assert_eq!(consumer.retry_backoff, Duration::from_millis(5));
1128        assert_eq!(consumer.max_retries, 6);
1129        assert_eq!(consumer.rebalance_timeout, Duration::from_secs(7));
1130        assert_eq!(consumer.fetch_max_wait, Duration::from_millis(8));
1131        assert_eq!(consumer.fetch_min_bytes, 9);
1132        assert_eq!(consumer.fetch_max_bytes, 10);
1133        assert_eq!(consumer.partition_max_bytes, 11);
1134        assert_eq!(consumer.auto_offset_reset, AutoOffsetReset::Latest);
1135        assert_eq!(consumer.isolation_level, IsolationLevel::ReadCommitted);
1136        assert!(consumer.enable_auto_commit);
1137        assert_eq!(consumer.auto_commit_interval, Duration::from_secs(12));
1138        assert_eq!(consumer.server_assignor.as_deref(), Some("range"));
1139        assert_eq!(consumer.rack_id.as_deref(), Some("rack-a"));
1140        assert_eq!(consumer.instance_id.as_deref(), Some("instance-a"));
1141    }
1142}