Skip to main content

kafkit_client/core/
config.rs

1//! Configuration types used to build clients.
2//!
3//! The direct config structs are useful when you want full control. For day to
4//! day code, [`KafkaClient`](crate::KafkaClient) and its builders are a little
5//! more convenient.
6//!
7//! ```no_run
8//! use std::time::Duration;
9//! use kafkit_client::{ProducerConfig, SecurityProtocol};
10//!
11//! let config = ProducerConfig::new("localhost:9092")
12//!     .with_client_id("orders-writer")
13//!     .with_security_protocol(SecurityProtocol::Plaintext)
14//!     .with_linger(Duration::from_millis(5));
15//! ```
16//!
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::Duration;
20
21use kafka_protocol::records::Compression;
22
23use crate::constants::{READ_COMMITTED, READ_UNCOMMITTED};
24use crate::network::{TcpConnector, TokioTcpConnector};
25use crate::types::ConsumerRebalanceListener;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28/// Security Protocol.
29pub enum SecurityProtocol {
30    /// Plaintext.
31    Plaintext,
32    /// Ssl.
33    Ssl,
34    /// Sasl plaintext.
35    SaslPlaintext,
36    /// Sasl ssl.
37    SaslSsl,
38}
39
40impl SecurityProtocol {
41    /// Returns whether tls.
42    pub fn uses_tls(self) -> bool {
43        matches!(self, Self::Ssl | Self::SaslSsl)
44    }
45
46    /// Returns whether sasl.
47    pub fn uses_sasl(self) -> bool {
48        matches!(self, Self::SaslPlaintext | Self::SaslSsl)
49    }
50
51    fn with_sasl(self) -> Self {
52        if self.uses_tls() {
53            Self::SaslSsl
54        } else {
55            Self::SaslPlaintext
56        }
57    }
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
61/// Sasl Mechanism.
62pub enum SaslMechanism {
63    #[default]
64    /// Plain.
65    Plain,
66    /// Scram sha256.
67    ScramSha256,
68    /// Scram sha512.
69    ScramSha512,
70}
71
72impl SaslMechanism {
73    /// Returns the Kafka-style name for this value.
74    pub fn as_str(self) -> &'static str {
75        match self {
76            Self::Plain => "PLAIN",
77            Self::ScramSha256 => "SCRAM-SHA-256",
78            Self::ScramSha512 => "SCRAM-SHA-512",
79        }
80    }
81
82    /// Returns whether scram.
83    pub fn is_scram(self) -> bool {
84        matches!(self, Self::ScramSha256 | Self::ScramSha512)
85    }
86
87    /// Scram Type.
88    pub fn scram_type(self) -> Option<i8> {
89        match self {
90            Self::Plain => None,
91            Self::ScramSha256 => Some(1),
92            Self::ScramSha512 => Some(2),
93        }
94    }
95}
96
97#[derive(Debug, Clone, Default, PartialEq, Eq)]
98/// Sasl Config.
99pub struct SaslConfig {
100    /// Mechanism.
101    pub mechanism: SaslMechanism,
102    /// Username.
103    pub username: Option<String>,
104    /// Password.
105    pub password: Option<String>,
106    /// Authorization Id.
107    pub authorization_id: Option<String>,
108}
109
110impl SaslConfig {
111    /// Plain.
112    pub fn plain(username: impl Into<String>, password: impl Into<String>) -> Self {
113        Self {
114            mechanism: SaslMechanism::Plain,
115            username: Some(username.into()),
116            password: Some(password.into()),
117            authorization_id: None,
118        }
119    }
120
121    /// Scram Sha 256.
122    pub fn scram_sha_256(username: impl Into<String>, password: impl Into<String>) -> Self {
123        Self {
124            mechanism: SaslMechanism::ScramSha256,
125            username: Some(username.into()),
126            password: Some(password.into()),
127            authorization_id: None,
128        }
129    }
130
131    /// Scram Sha 512.
132    pub fn scram_sha_512(username: impl Into<String>, password: impl Into<String>) -> Self {
133        Self {
134            mechanism: SaslMechanism::ScramSha512,
135            username: Some(username.into()),
136            password: Some(password.into()),
137            authorization_id: None,
138        }
139    }
140
141    /// Sets authorization id and returns the updated value.
142    pub fn with_authorization_id(mut self, authorization_id: impl Into<String>) -> Self {
143        self.authorization_id = Some(authorization_id.into());
144        self
145    }
146}
147
148#[derive(Debug, Clone, Default, PartialEq, Eq)]
149/// Tls Config.
150pub struct TlsConfig {
151    /// Ca Cert Path.
152    pub ca_cert_path: Option<PathBuf>,
153    /// Client Cert Path.
154    pub client_cert_path: Option<PathBuf>,
155    /// Client Key Path.
156    pub client_key_path: Option<PathBuf>,
157    /// Server Name.
158    pub server_name: Option<String>,
159}
160
161impl TlsConfig {
162    /// Creates a new value.
163    pub fn new() -> Self {
164        Self::default()
165    }
166
167    /// Sets ca cert path and returns the updated value.
168    pub fn with_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
169        self.ca_cert_path = Some(path.into());
170        self
171    }
172
173    /// Sets client cert path and returns the updated value.
174    pub fn with_client_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
175        self.client_cert_path = Some(path.into());
176        self
177    }
178
179    /// Sets client key path and returns the updated value.
180    pub fn with_client_key_path(mut self, path: impl Into<PathBuf>) -> Self {
181        self.client_key_path = Some(path.into());
182        self
183    }
184
185    /// Sets server name and returns the updated value.
186    pub fn with_server_name(mut self, server_name: impl Into<String>) -> Self {
187        self.server_name = Some(server_name.into());
188        self
189    }
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
193/// Producer Partitioner.
194pub enum ProducerPartitioner {
195    /// Default.
196    Default,
197}
198
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200/// Producer Compression.
201pub enum ProducerCompression {
202    /// None.
203    None,
204    /// Gzip.
205    Gzip,
206    /// Snappy.
207    Snappy,
208    /// Lz4.
209    Lz4,
210    /// Zstd.
211    Zstd,
212}
213
214impl From<ProducerCompression> for Compression {
215    fn from(value: ProducerCompression) -> Self {
216        match value {
217            ProducerCompression::None => Compression::None,
218            ProducerCompression::Gzip => Compression::Gzip,
219            ProducerCompression::Snappy => Compression::Snappy,
220            ProducerCompression::Lz4 => Compression::Lz4,
221            ProducerCompression::Zstd => Compression::Zstd,
222        }
223    }
224}
225
226#[derive(Debug, Clone)]
227/// Producer Config.
228pub struct ProducerConfig {
229    /// Bootstrap Servers.
230    pub bootstrap_servers: Vec<String>,
231    /// Client Id.
232    pub client_id: String,
233    /// Security Protocol.
234    pub security_protocol: SecurityProtocol,
235    /// Tls.
236    pub tls: TlsConfig,
237    /// Sasl.
238    pub sasl: SaslConfig,
239    /// Acks.
240    pub acks: i16,
241    /// Enable Idempotence.
242    pub enable_idempotence: bool,
243    /// Partitioner.
244    pub partitioner: ProducerPartitioner,
245    /// Compression.
246    pub compression: ProducerCompression,
247    /// Batch Size.
248    pub batch_size: usize,
249    /// Linger.
250    pub linger: Duration,
251    /// Delivery Timeout.
252    pub delivery_timeout: Duration,
253    /// Request Timeout.
254    pub request_timeout: Duration,
255    /// Metadata Max Age.
256    pub metadata_max_age: Duration,
257    /// Retry Backoff.
258    pub retry_backoff: Duration,
259    /// Max Retries.
260    pub max_retries: usize,
261    /// Max In Flight Requests Per Connection.
262    pub max_in_flight_requests_per_connection: usize,
263    /// Transactional Id.
264    pub transactional_id: Option<String>,
265    /// Transaction Timeout.
266    pub transaction_timeout: Duration,
267    /// TCP connector.
268    pub tcp_connector: Arc<dyn TcpConnector>,
269}
270
271impl ProducerConfig {
272    /// Creates a new value.
273    pub fn new(bootstrap_server: impl Into<String>) -> Self {
274        Self {
275            bootstrap_servers: vec![bootstrap_server.into()],
276            client_id: "rust-producer".to_owned(),
277            security_protocol: SecurityProtocol::Plaintext,
278            tls: TlsConfig::default(),
279            sasl: SaslConfig::default(),
280            acks: 1,
281            enable_idempotence: false,
282            partitioner: ProducerPartitioner::Default,
283            compression: ProducerCompression::None,
284            batch_size: 16 * 1024,
285            linger: Duration::ZERO,
286            delivery_timeout: Duration::from_secs(120),
287            request_timeout: Duration::from_secs(5),
288            metadata_max_age: Duration::from_secs(30),
289            retry_backoff: Duration::from_millis(250),
290            max_retries: 3,
291            max_in_flight_requests_per_connection: 5,
292            transactional_id: None,
293            transaction_timeout: Duration::from_secs(30),
294            tcp_connector: Arc::new(TokioTcpConnector),
295        }
296    }
297
298    /// Sets client id and returns the updated value.
299    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
300        self.client_id = client_id.into();
301        self
302    }
303
304    /// Sets bootstrap servers and returns the updated value.
305    pub fn with_bootstrap_servers(
306        mut self,
307        servers: impl IntoIterator<Item = impl Into<String>>,
308    ) -> Self {
309        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
310        self
311    }
312
313    /// Sets security protocol and returns the updated value.
314    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
315        self.security_protocol = security_protocol;
316        self
317    }
318
319    /// Sets tls and returns the updated value.
320    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
321        self.security_protocol = if self.security_protocol.uses_sasl() {
322            SecurityProtocol::SaslSsl
323        } else {
324            SecurityProtocol::Ssl
325        };
326        self.tls = tls;
327        self
328    }
329
330    /// Sets tls ca cert path and returns the updated value.
331    pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
332        self.security_protocol = if self.security_protocol.uses_sasl() {
333            SecurityProtocol::SaslSsl
334        } else {
335            SecurityProtocol::Ssl
336        };
337        self.tls = self.tls.with_ca_cert_path(path);
338        self
339    }
340
341    /// Sets tls client auth paths and returns the updated value.
342    pub fn with_tls_client_auth_paths(
343        mut self,
344        cert_path: impl Into<PathBuf>,
345        key_path: impl Into<PathBuf>,
346    ) -> Self {
347        self.security_protocol = if self.security_protocol.uses_sasl() {
348            SecurityProtocol::SaslSsl
349        } else {
350            SecurityProtocol::Ssl
351        };
352        self.tls = self
353            .tls
354            .with_client_cert_path(cert_path)
355            .with_client_key_path(key_path);
356        self
357    }
358
359    /// Sets tls server name and returns the updated value.
360    pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
361        self.security_protocol = if self.security_protocol.uses_sasl() {
362            SecurityProtocol::SaslSsl
363        } else {
364            SecurityProtocol::Ssl
365        };
366        self.tls = self.tls.with_server_name(server_name);
367        self
368    }
369
370    /// Sets sasl and returns the updated value.
371    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
372        self.security_protocol = self.security_protocol.with_sasl();
373        self.sasl = sasl;
374        self
375    }
376
377    /// Sets sasl plain and returns the updated value.
378    pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
379        self.with_sasl(SaslConfig::plain(username, password))
380    }
381
382    /// Sets sasl scram sha 256 and returns the updated value.
383    pub fn with_sasl_scram_sha_256(
384        self,
385        username: impl Into<String>,
386        password: impl Into<String>,
387    ) -> Self {
388        self.with_sasl(SaslConfig::scram_sha_256(username, password))
389    }
390
391    /// Sets sasl scram sha 512 and returns the updated value.
392    pub fn with_sasl_scram_sha_512(
393        self,
394        username: impl Into<String>,
395        password: impl Into<String>,
396    ) -> Self {
397        self.with_sasl(SaslConfig::scram_sha_512(username, password))
398    }
399
400    /// Sets acks and returns the updated value.
401    pub fn with_acks(mut self, acks: i16) -> Self {
402        self.acks = acks;
403        self
404    }
405
406    /// Sets enable idempotence and returns the updated value.
407    pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
408        self.enable_idempotence = enable_idempotence;
409        if enable_idempotence {
410            self.acks = -1;
411            self.max_retries = self.max_retries.max(1);
412        }
413        self
414    }
415
416    /// Sets partitioner and returns the updated value.
417    pub fn with_partitioner(mut self, partitioner: ProducerPartitioner) -> Self {
418        self.partitioner = partitioner;
419        self
420    }
421
422    /// Sets compression and returns the updated value.
423    pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
424        self.compression = compression;
425        self
426    }
427
428    /// Sets batch size and returns the updated value.
429    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
430        self.batch_size = batch_size.max(1);
431        self
432    }
433
434    /// Sets linger and returns the updated value.
435    pub fn with_linger(mut self, linger: Duration) -> Self {
436        self.linger = linger;
437        self
438    }
439
440    /// Sets delivery timeout and returns the updated value.
441    pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
442        self.delivery_timeout = delivery_timeout;
443        self
444    }
445
446    /// Sets request timeout and returns the updated value.
447    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
448        self.request_timeout = request_timeout;
449        self
450    }
451
452    /// Sets metadata max age and returns the updated value.
453    pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
454        self.metadata_max_age = metadata_max_age;
455        self
456    }
457
458    /// Sets retry backoff and returns the updated value.
459    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
460        self.retry_backoff = retry_backoff;
461        self
462    }
463
464    /// Sets max retries and returns the updated value.
465    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
466        self.max_retries = max_retries;
467        self
468    }
469
470    /// Sets max in-flight requests per broker connection and returns the updated value.
471    pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
472        self.max_in_flight_requests_per_connection = max_in_flight.max(1);
473        self
474    }
475
476    /// Sets transactional id and returns the updated value.
477    pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
478        self.transactional_id = Some(transactional_id.into());
479        self.acks = -1;
480        self.enable_idempotence = true;
481        self
482    }
483
484    /// Sets transaction timeout and returns the updated value.
485    pub fn with_transaction_timeout(mut self, transaction_timeout: Duration) -> Self {
486        self.transaction_timeout = transaction_timeout;
487        self
488    }
489
490    /// Sets TCP connector and returns the updated value.
491    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
492        self.tcp_connector = tcp_connector;
493        self
494    }
495
496    /// Returns whether transactional.
497    pub fn is_transactional(&self) -> bool {
498        self.transactional_id.is_some()
499    }
500
501    /// Returns whether idempotent.
502    pub fn is_idempotent(&self) -> bool {
503        self.enable_idempotence
504    }
505}
506
507/// Alias for client config.
508pub type ClientConfig = ProducerConfig;
509
510#[derive(Debug, Clone)]
511/// Admin Config.
512pub struct AdminConfig {
513    /// Bootstrap Servers.
514    pub bootstrap_servers: Vec<String>,
515    /// Client Id.
516    pub client_id: String,
517    /// Security Protocol.
518    pub security_protocol: SecurityProtocol,
519    /// Tls.
520    pub tls: TlsConfig,
521    /// Sasl.
522    pub sasl: SaslConfig,
523    /// Request Timeout.
524    pub request_timeout: Duration,
525    /// TCP connector.
526    pub tcp_connector: Arc<dyn TcpConnector>,
527}
528
529impl AdminConfig {
530    /// Creates a new value.
531    pub fn new(bootstrap_server: impl Into<String>) -> Self {
532        Self {
533            bootstrap_servers: vec![bootstrap_server.into()],
534            client_id: "rust-admin".to_owned(),
535            security_protocol: SecurityProtocol::Plaintext,
536            tls: TlsConfig::default(),
537            sasl: SaslConfig::default(),
538            request_timeout: Duration::from_secs(5),
539            tcp_connector: Arc::new(TokioTcpConnector),
540        }
541    }
542
543    /// Sets client id and returns the updated value.
544    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
545        self.client_id = client_id.into();
546        self
547    }
548
549    /// Sets bootstrap servers and returns the updated value.
550    pub fn with_bootstrap_servers(
551        mut self,
552        servers: impl IntoIterator<Item = impl Into<String>>,
553    ) -> Self {
554        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
555        self
556    }
557
558    /// Sets security protocol and returns the updated value.
559    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
560        self.security_protocol = security_protocol;
561        self
562    }
563
564    /// Sets tls and returns the updated value.
565    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
566        self.security_protocol = if self.security_protocol.uses_sasl() {
567            SecurityProtocol::SaslSsl
568        } else {
569            SecurityProtocol::Ssl
570        };
571        self.tls = tls;
572        self
573    }
574
575    /// Sets tls ca cert path and returns the updated value.
576    pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
577        self.security_protocol = if self.security_protocol.uses_sasl() {
578            SecurityProtocol::SaslSsl
579        } else {
580            SecurityProtocol::Ssl
581        };
582        self.tls = self.tls.with_ca_cert_path(path);
583        self
584    }
585
586    /// Sets tls client auth paths and returns the updated value.
587    pub fn with_tls_client_auth_paths(
588        mut self,
589        cert_path: impl Into<PathBuf>,
590        key_path: impl Into<PathBuf>,
591    ) -> Self {
592        self.security_protocol = if self.security_protocol.uses_sasl() {
593            SecurityProtocol::SaslSsl
594        } else {
595            SecurityProtocol::Ssl
596        };
597        self.tls = self
598            .tls
599            .with_client_cert_path(cert_path)
600            .with_client_key_path(key_path);
601        self
602    }
603
604    /// Sets tls server name and returns the updated value.
605    pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
606        self.security_protocol = if self.security_protocol.uses_sasl() {
607            SecurityProtocol::SaslSsl
608        } else {
609            SecurityProtocol::Ssl
610        };
611        self.tls = self.tls.with_server_name(server_name);
612        self
613    }
614
615    /// Sets sasl and returns the updated value.
616    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
617        self.security_protocol = self.security_protocol.with_sasl();
618        self.sasl = sasl;
619        self
620    }
621
622    /// Sets sasl plain and returns the updated value.
623    pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
624        self.with_sasl(SaslConfig::plain(username, password))
625    }
626
627    /// Sets sasl scram sha 256 and returns the updated value.
628    pub fn with_sasl_scram_sha_256(
629        self,
630        username: impl Into<String>,
631        password: impl Into<String>,
632    ) -> Self {
633        self.with_sasl(SaslConfig::scram_sha_256(username, password))
634    }
635
636    /// Sets sasl scram sha 512 and returns the updated value.
637    pub fn with_sasl_scram_sha_512(
638        self,
639        username: impl Into<String>,
640        password: impl Into<String>,
641    ) -> Self {
642        self.with_sasl(SaslConfig::scram_sha_512(username, password))
643    }
644
645    /// Sets request timeout and returns the updated value.
646    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
647        self.request_timeout = request_timeout;
648        self
649    }
650
651    /// Sets TCP connector and returns the updated value.
652    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
653        self.tcp_connector = tcp_connector;
654        self
655    }
656}
657
658#[derive(Debug, Clone, Copy, PartialEq, Eq)]
659/// Auto Offset Reset.
660pub enum AutoOffsetReset {
661    /// Earliest.
662    Earliest,
663    /// Latest.
664    Latest,
665}
666
667#[derive(Debug, Clone, Copy, PartialEq, Eq)]
668/// Isolation Level.
669pub enum IsolationLevel {
670    /// Read uncommitted.
671    ReadUncommitted,
672    /// Read committed.
673    ReadCommitted,
674}
675
676impl IsolationLevel {
677    /// As Protocol Value.
678    pub fn as_protocol_value(self) -> i8 {
679        match self {
680            Self::ReadUncommitted => READ_UNCOMMITTED,
681            Self::ReadCommitted => READ_COMMITTED,
682        }
683    }
684}
685
686#[derive(Debug, Clone)]
687/// Consumer Config.
688pub struct ConsumerConfig {
689    /// Bootstrap Servers.
690    pub bootstrap_servers: Vec<String>,
691    /// Client Id.
692    pub client_id: String,
693    /// Group Id.
694    pub group_id: String,
695    /// Security Protocol.
696    pub security_protocol: SecurityProtocol,
697    /// Tls.
698    pub tls: TlsConfig,
699    /// Sasl.
700    pub sasl: SaslConfig,
701    /// Request Timeout.
702    pub request_timeout: Duration,
703    /// Metadata Max Age.
704    pub metadata_max_age: Duration,
705    /// Retry Backoff.
706    pub retry_backoff: Duration,
707    /// Max Retries.
708    pub max_retries: usize,
709    /// Rebalance Timeout.
710    pub rebalance_timeout: Duration,
711    /// Fetch Max Wait.
712    pub fetch_max_wait: Duration,
713    /// Fetch Min Bytes.
714    pub fetch_min_bytes: i32,
715    /// Fetch Max Bytes.
716    pub fetch_max_bytes: i32,
717    /// Partition Max Bytes.
718    pub partition_max_bytes: i32,
719    /// Auto Offset Reset.
720    pub auto_offset_reset: AutoOffsetReset,
721    /// Isolation Level.
722    pub isolation_level: IsolationLevel,
723    /// Enable Auto Commit.
724    pub enable_auto_commit: bool,
725    /// Auto Commit Interval.
726    pub auto_commit_interval: Duration,
727    /// Server Assignor.
728    pub server_assignor: Option<String>,
729    /// Rack Id.
730    pub rack_id: Option<String>,
731    /// Instance Id.
732    pub instance_id: Option<String>,
733    /// Rebalance Listener.
734    pub rebalance_listener: Option<ConsumerRebalanceListener>,
735    /// TCP connector.
736    pub tcp_connector: Arc<dyn TcpConnector>,
737}
738
739impl ConsumerConfig {
740    /// Creates a new value.
741    pub fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
742        Self {
743            bootstrap_servers: vec![bootstrap_server.into()],
744            client_id: "rust-consumer".to_owned(),
745            group_id: group_id.into(),
746            security_protocol: SecurityProtocol::Plaintext,
747            tls: TlsConfig::default(),
748            sasl: SaslConfig::default(),
749            request_timeout: Duration::from_secs(5),
750            metadata_max_age: Duration::from_secs(30),
751            retry_backoff: Duration::from_millis(250),
752            max_retries: 3,
753            rebalance_timeout: Duration::from_secs(30),
754            fetch_max_wait: Duration::from_millis(500),
755            fetch_min_bytes: 1,
756            fetch_max_bytes: 50 * 1024 * 1024,
757            partition_max_bytes: 1024 * 1024,
758            auto_offset_reset: AutoOffsetReset::Earliest,
759            isolation_level: IsolationLevel::ReadUncommitted,
760            enable_auto_commit: false,
761            auto_commit_interval: Duration::from_secs(5),
762            server_assignor: None,
763            rack_id: None,
764            instance_id: None,
765            rebalance_listener: None,
766            tcp_connector: Arc::new(TokioTcpConnector),
767        }
768    }
769
770    /// Sets client id and returns the updated value.
771    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
772        self.client_id = client_id.into();
773        self
774    }
775
776    /// Sets bootstrap servers and returns the updated value.
777    pub fn with_bootstrap_servers(
778        mut self,
779        servers: impl IntoIterator<Item = impl Into<String>>,
780    ) -> Self {
781        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
782        self
783    }
784
785    /// Sets security protocol and returns the updated value.
786    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
787        self.security_protocol = security_protocol;
788        self
789    }
790
791    /// Sets tls and returns the updated value.
792    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
793        self.security_protocol = if self.security_protocol.uses_sasl() {
794            SecurityProtocol::SaslSsl
795        } else {
796            SecurityProtocol::Ssl
797        };
798        self.tls = tls;
799        self
800    }
801
802    /// Sets tls ca cert path and returns the updated value.
803    pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
804        self.security_protocol = if self.security_protocol.uses_sasl() {
805            SecurityProtocol::SaslSsl
806        } else {
807            SecurityProtocol::Ssl
808        };
809        self.tls = self.tls.with_ca_cert_path(path);
810        self
811    }
812
813    /// Sets tls client auth paths and returns the updated value.
814    pub fn with_tls_client_auth_paths(
815        mut self,
816        cert_path: impl Into<PathBuf>,
817        key_path: impl Into<PathBuf>,
818    ) -> Self {
819        self.security_protocol = if self.security_protocol.uses_sasl() {
820            SecurityProtocol::SaslSsl
821        } else {
822            SecurityProtocol::Ssl
823        };
824        self.tls = self
825            .tls
826            .with_client_cert_path(cert_path)
827            .with_client_key_path(key_path);
828        self
829    }
830
831    /// Sets tls server name and returns the updated value.
832    pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
833        self.security_protocol = if self.security_protocol.uses_sasl() {
834            SecurityProtocol::SaslSsl
835        } else {
836            SecurityProtocol::Ssl
837        };
838        self.tls = self.tls.with_server_name(server_name);
839        self
840    }
841
842    /// Sets sasl and returns the updated value.
843    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
844        self.security_protocol = self.security_protocol.with_sasl();
845        self.sasl = sasl;
846        self
847    }
848
849    /// Sets sasl plain and returns the updated value.
850    pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
851        self.with_sasl(SaslConfig::plain(username, password))
852    }
853
854    /// Sets sasl scram sha 256 and returns the updated value.
855    pub fn with_sasl_scram_sha_256(
856        self,
857        username: impl Into<String>,
858        password: impl Into<String>,
859    ) -> Self {
860        self.with_sasl(SaslConfig::scram_sha_256(username, password))
861    }
862
863    /// Sets sasl scram sha 512 and returns the updated value.
864    pub fn with_sasl_scram_sha_512(
865        self,
866        username: impl Into<String>,
867        password: impl Into<String>,
868    ) -> Self {
869        self.with_sasl(SaslConfig::scram_sha_512(username, password))
870    }
871
872    /// Sets request timeout and returns the updated value.
873    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
874        self.request_timeout = request_timeout;
875        self
876    }
877
878    /// Sets metadata max age and returns the updated value.
879    pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
880        self.metadata_max_age = metadata_max_age;
881        self
882    }
883
884    /// Sets retry backoff and returns the updated value.
885    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
886        self.retry_backoff = retry_backoff;
887        self
888    }
889
890    /// Sets max retries and returns the updated value.
891    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
892        self.max_retries = max_retries;
893        self
894    }
895
896    /// Sets rebalance timeout and returns the updated value.
897    pub fn with_rebalance_timeout(mut self, rebalance_timeout: Duration) -> Self {
898        self.rebalance_timeout = rebalance_timeout;
899        self
900    }
901
902    /// Sets fetch max wait and returns the updated value.
903    pub fn with_fetch_max_wait(mut self, fetch_max_wait: Duration) -> Self {
904        self.fetch_max_wait = fetch_max_wait;
905        self
906    }
907
908    /// Sets fetch min bytes and returns the updated value.
909    pub fn with_fetch_min_bytes(mut self, fetch_min_bytes: i32) -> Self {
910        self.fetch_min_bytes = fetch_min_bytes;
911        self
912    }
913
914    /// Sets fetch max bytes and returns the updated value.
915    pub fn with_fetch_max_bytes(mut self, fetch_max_bytes: i32) -> Self {
916        self.fetch_max_bytes = fetch_max_bytes;
917        self
918    }
919
920    /// Sets partition max bytes and returns the updated value.
921    pub fn with_partition_max_bytes(mut self, partition_max_bytes: i32) -> Self {
922        self.partition_max_bytes = partition_max_bytes;
923        self
924    }
925
926    /// Sets auto offset reset and returns the updated value.
927    pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
928        self.auto_offset_reset = auto_offset_reset;
929        self
930    }
931
932    /// Sets isolation level and returns the updated value.
933    pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
934        self.isolation_level = isolation_level;
935        self
936    }
937
938    /// Sets enable auto commit and returns the updated value.
939    pub fn with_enable_auto_commit(mut self, enable_auto_commit: bool) -> Self {
940        self.enable_auto_commit = enable_auto_commit;
941        self
942    }
943
944    /// Sets auto commit interval and returns the updated value.
945    pub fn with_auto_commit_interval(mut self, auto_commit_interval: Duration) -> Self {
946        self.auto_commit_interval = auto_commit_interval;
947        self
948    }
949
950    /// Sets server assignor and returns the updated value.
951    pub fn with_server_assignor(mut self, server_assignor: impl Into<String>) -> Self {
952        self.server_assignor = Some(server_assignor.into());
953        self
954    }
955
956    /// Sets rack id and returns the updated value.
957    pub fn with_rack_id(mut self, rack_id: impl Into<String>) -> Self {
958        self.rack_id = Some(rack_id.into());
959        self
960    }
961
962    /// Sets instance id and returns the updated value.
963    pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
964        self.instance_id = Some(instance_id.into());
965        self
966    }
967
968    /// Sets rebalance listener and returns the updated value.
969    pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
970        self.rebalance_listener = Some(listener);
971        self
972    }
973
974    /// Sets rebalance callback and returns the updated value.
975    pub fn with_rebalance_callback(
976        self,
977        callback: impl Fn(crate::types::ConsumerRebalanceEvent) + Send + Sync + 'static,
978    ) -> Self {
979        self.with_rebalance_listener(ConsumerRebalanceListener::new(callback))
980    }
981
982    /// Sets TCP connector and returns the updated value.
983    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
984        self.tcp_connector = tcp_connector;
985        self
986    }
987}
988
989#[cfg(test)]
990mod tests {
991    use super::*;
992
993    #[test]
994    fn producer_tls_builder_enables_ssl() {
995        let config =
996            ProducerConfig::new("localhost:9093").with_tls_ca_cert_path("/tmp/cluster-ca.pem");
997        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
998        assert_eq!(
999            config.tls.ca_cert_path.as_deref(),
1000            Some(std::path::Path::new("/tmp/cluster-ca.pem"))
1001        );
1002    }
1003
1004    #[test]
1005    fn producer_sasl_plain_builder_enables_sasl_plaintext() {
1006        let config = ProducerConfig::new("localhost:9092").with_sasl_plain("user-a", "secret-a");
1007        assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
1008        assert_eq!(config.sasl.mechanism, SaslMechanism::Plain);
1009        assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
1010        assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
1011    }
1012
1013    #[test]
1014    fn producer_sasl_scram_builder_enables_sasl_plaintext() {
1015        let config =
1016            ProducerConfig::new("localhost:9092").with_sasl_scram_sha_256("user-a", "secret-a");
1017        assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
1018        assert_eq!(config.sasl.mechanism, SaslMechanism::ScramSha256);
1019        assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
1020        assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
1021    }
1022
1023    #[test]
1024    fn producer_tls_and_sasl_builders_enable_sasl_ssl() {
1025        let config = ProducerConfig::new("localhost:9093")
1026            .with_sasl_plain("user-a", "secret-a")
1027            .with_tls_ca_cert_path("/tmp/cluster-ca.pem");
1028        assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
1029
1030        let config = ProducerConfig::new("localhost:9093")
1031            .with_tls_ca_cert_path("/tmp/cluster-ca.pem")
1032            .with_sasl_plain("user-a", "secret-a");
1033        assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
1034    }
1035
1036    #[test]
1037    fn producer_multi_broker_with_bootstrap_servers() {
1038        let config = ProducerConfig::new("host1:9092").with_bootstrap_servers([
1039            "host1:9092",
1040            "host2:9092",
1041            "host3:9092",
1042        ]);
1043        assert_eq!(
1044            config.bootstrap_servers,
1045            vec!["host1:9092", "host2:9092", "host3:9092"]
1046        );
1047    }
1048
1049    #[test]
1050    fn producer_builder_records_tuning_and_transaction_options() {
1051        let config = ProducerConfig::new("localhost:9092")
1052            .with_client_id("producer-a")
1053            .with_security_protocol(SecurityProtocol::Ssl)
1054            .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1055            .with_tls_server_name("kafka.internal")
1056            .with_acks(-1)
1057            .with_enable_idempotence(true)
1058            .with_partitioner(ProducerPartitioner::Default)
1059            .with_compression(ProducerCompression::Zstd)
1060            .with_batch_size(0)
1061            .with_linger(Duration::from_millis(50))
1062            .with_delivery_timeout(Duration::from_secs(10))
1063            .with_request_timeout(Duration::from_secs(2))
1064            .with_metadata_max_age(Duration::from_secs(60))
1065            .with_retry_backoff(Duration::from_millis(75))
1066            .with_max_retries(7)
1067            .with_transactional_id("tx-a")
1068            .with_transaction_timeout(Duration::from_secs(45));
1069
1070        assert_eq!(config.client_id, "producer-a");
1071        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1072        assert_eq!(
1073            config.tls.client_cert_path.as_deref(),
1074            Some(std::path::Path::new("/tmp/client.crt"))
1075        );
1076        assert_eq!(
1077            config.tls.client_key_path.as_deref(),
1078            Some(std::path::Path::new("/tmp/client.key"))
1079        );
1080        assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1081        assert_eq!(config.acks, -1);
1082        assert!(config.is_idempotent());
1083        assert_eq!(config.compression, ProducerCompression::Zstd);
1084        assert_eq!(config.batch_size, 1);
1085        assert_eq!(config.linger, Duration::from_millis(50));
1086        assert_eq!(config.delivery_timeout, Duration::from_secs(10));
1087        assert_eq!(config.request_timeout, Duration::from_secs(2));
1088        assert_eq!(config.metadata_max_age, Duration::from_secs(60));
1089        assert_eq!(config.retry_backoff, Duration::from_millis(75));
1090        assert_eq!(config.max_retries, 7);
1091        assert_eq!(config.transactional_id.as_deref(), Some("tx-a"));
1092        assert_eq!(config.transaction_timeout, Duration::from_secs(45));
1093        assert!(config.is_transactional());
1094    }
1095
1096    #[test]
1097    fn consumer_tls_builder_keeps_override_server_name() {
1098        let config = ConsumerConfig::new("localhost:9093", "group-a")
1099            .with_tls(TlsConfig::new().with_server_name("kafka.internal"));
1100        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1101        assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1102    }
1103
1104    #[test]
1105    fn consumer_multi_broker_with_bootstrap_servers() {
1106        let config = ConsumerConfig::new("host1:9092", "group-a")
1107            .with_bootstrap_servers(["host1:9092", "host2:9092"]);
1108        assert_eq!(config.bootstrap_servers, vec!["host1:9092", "host2:9092"]);
1109    }
1110
1111    #[test]
1112    fn admin_tls_builder_records_client_auth_paths() {
1113        let config = AdminConfig::new("localhost:9093")
1114            .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key");
1115        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1116        assert_eq!(
1117            config.tls.client_cert_path.as_deref(),
1118            Some(std::path::Path::new("/tmp/client.crt"))
1119        );
1120        assert_eq!(
1121            config.tls.client_key_path.as_deref(),
1122            Some(std::path::Path::new("/tmp/client.key"))
1123        );
1124    }
1125
1126    #[test]
1127    fn admin_and_consumer_builders_record_remaining_options() {
1128        let admin = AdminConfig::new("localhost:9092")
1129            .with_client_id("admin-a")
1130            .with_bootstrap_servers(["host-a:9092", "host-b:9092"])
1131            .with_security_protocol(SecurityProtocol::Plaintext)
1132            .with_tls(TlsConfig::new().with_ca_cert_path("/tmp/ca.pem"))
1133            .with_sasl_scram_sha_512("user-a", "secret-a")
1134            .with_request_timeout(Duration::from_secs(8));
1135
1136        assert_eq!(admin.bootstrap_servers, vec!["host-a:9092", "host-b:9092"]);
1137        assert_eq!(admin.client_id, "admin-a");
1138        assert_eq!(admin.security_protocol, SecurityProtocol::SaslSsl);
1139        assert_eq!(
1140            admin.tls.ca_cert_path.as_deref(),
1141            Some(std::path::Path::new("/tmp/ca.pem"))
1142        );
1143        assert_eq!(admin.sasl.mechanism, SaslMechanism::ScramSha512);
1144        assert_eq!(admin.request_timeout, Duration::from_secs(8));
1145
1146        let consumer = ConsumerConfig::new("localhost:9092", "group-a")
1147            .with_client_id("consumer-a")
1148            .with_security_protocol(SecurityProtocol::Ssl)
1149            .with_tls_ca_cert_path("/tmp/ca.pem")
1150            .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1151            .with_sasl_scram_sha_512("user-b", "secret-b")
1152            .with_request_timeout(Duration::from_secs(3))
1153            .with_metadata_max_age(Duration::from_secs(4))
1154            .with_retry_backoff(Duration::from_millis(5))
1155            .with_max_retries(6)
1156            .with_rebalance_timeout(Duration::from_secs(7))
1157            .with_fetch_max_wait(Duration::from_millis(8))
1158            .with_fetch_min_bytes(9)
1159            .with_fetch_max_bytes(10)
1160            .with_partition_max_bytes(11)
1161            .with_auto_offset_reset(AutoOffsetReset::Latest)
1162            .with_isolation_level(IsolationLevel::ReadCommitted)
1163            .with_enable_auto_commit(true)
1164            .with_auto_commit_interval(Duration::from_secs(12))
1165            .with_server_assignor("range")
1166            .with_rack_id("rack-a")
1167            .with_instance_id("instance-a");
1168
1169        assert_eq!(consumer.client_id, "consumer-a");
1170        assert_eq!(consumer.security_protocol, SecurityProtocol::SaslSsl);
1171        assert_eq!(consumer.sasl.mechanism, SaslMechanism::ScramSha512);
1172        assert_eq!(consumer.request_timeout, Duration::from_secs(3));
1173        assert_eq!(consumer.metadata_max_age, Duration::from_secs(4));
1174        assert_eq!(consumer.retry_backoff, Duration::from_millis(5));
1175        assert_eq!(consumer.max_retries, 6);
1176        assert_eq!(consumer.rebalance_timeout, Duration::from_secs(7));
1177        assert_eq!(consumer.fetch_max_wait, Duration::from_millis(8));
1178        assert_eq!(consumer.fetch_min_bytes, 9);
1179        assert_eq!(consumer.fetch_max_bytes, 10);
1180        assert_eq!(consumer.partition_max_bytes, 11);
1181        assert_eq!(consumer.auto_offset_reset, AutoOffsetReset::Latest);
1182        assert_eq!(consumer.isolation_level, IsolationLevel::ReadCommitted);
1183        assert!(consumer.enable_auto_commit);
1184        assert_eq!(consumer.auto_commit_interval, Duration::from_secs(12));
1185        assert_eq!(consumer.server_assignor.as_deref(), Some("range"));
1186        assert_eq!(consumer.rack_id.as_deref(), Some("rack-a"));
1187        assert_eq!(consumer.instance_id.as_deref(), Some("instance-a"));
1188    }
1189}