Skip to main content

kafkit_client/core/
config.rs

1//! Configuration types used to build clients.
2//!
3//! The direct config structs are useful when you want full control. For day to
4//! day code, [`KafkaClient`](crate::KafkaClient) and its builders are a little
5//! more convenient.
6//!
7//! ```no_run
8//! use std::time::Duration;
9//! use kafkit_client::{ProducerConfig, SecurityProtocol};
10//!
11//! let config = ProducerConfig::new("localhost:9092")
12//!     .with_client_id("orders-writer")
13//!     .with_security_protocol(SecurityProtocol::Plaintext)
14//!     .with_linger(Duration::from_millis(5));
15//! ```
16//!
17use std::fmt;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::time::Duration;
21
22use kafka_protocol::records::Compression;
23
24use crate::constants::{READ_COMMITTED, READ_UNCOMMITTED};
25use crate::network::{TcpConnector, TokioTcpConnector};
26use crate::types::ConsumerRebalanceListener;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// Security Protocol.
30pub enum SecurityProtocol {
31    /// Plaintext.
32    Plaintext,
33    /// Ssl.
34    Ssl,
35    /// Sasl plaintext.
36    SaslPlaintext,
37    /// Sasl ssl.
38    SaslSsl,
39}
40
41impl SecurityProtocol {
42    /// Returns whether tls.
43    pub fn uses_tls(self) -> bool {
44        matches!(self, Self::Ssl | Self::SaslSsl)
45    }
46
47    /// Returns whether sasl.
48    pub fn uses_sasl(self) -> bool {
49        matches!(self, Self::SaslPlaintext | Self::SaslSsl)
50    }
51
52    fn with_sasl(self) -> Self {
53        if self.uses_tls() {
54            Self::SaslSsl
55        } else {
56            Self::SaslPlaintext
57        }
58    }
59}
60
61#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
62/// Sasl Mechanism.
63pub enum SaslMechanism {
64    #[default]
65    /// Plain.
66    Plain,
67    /// Scram sha256.
68    ScramSha256,
69    /// Scram sha512.
70    ScramSha512,
71}
72
73impl SaslMechanism {
74    /// Returns the Kafka-style name for this value.
75    pub fn as_str(self) -> &'static str {
76        match self {
77            Self::Plain => "PLAIN",
78            Self::ScramSha256 => "SCRAM-SHA-256",
79            Self::ScramSha512 => "SCRAM-SHA-512",
80        }
81    }
82
83    /// Returns whether scram.
84    pub fn is_scram(self) -> bool {
85        matches!(self, Self::ScramSha256 | Self::ScramSha512)
86    }
87
88    /// Scram Type.
89    pub fn scram_type(self) -> Option<i8> {
90        match self {
91            Self::Plain => None,
92            Self::ScramSha256 => Some(1),
93            Self::ScramSha512 => Some(2),
94        }
95    }
96}
97
98#[derive(Clone, Default, PartialEq, Eq)]
99/// Sasl Config.
100pub struct SaslConfig {
101    /// Mechanism.
102    pub mechanism: SaslMechanism,
103    /// Username.
104    pub username: Option<String>,
105    /// Password.
106    pub password: Option<String>,
107    /// Authorization Id.
108    pub authorization_id: Option<String>,
109}
110
111impl fmt::Debug for SaslConfig {
112    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
113        formatter
114            .debug_struct("SaslConfig")
115            .field("mechanism", &self.mechanism)
116            .field("username", &self.username)
117            .field("password", &self.password.as_ref().map(|_| "<redacted>"))
118            .field("authorization_id", &self.authorization_id)
119            .finish()
120    }
121}
122
123impl SaslConfig {
124    /// Plain.
125    pub fn plain(username: impl Into<String>, password: impl Into<String>) -> Self {
126        Self {
127            mechanism: SaslMechanism::Plain,
128            username: Some(username.into()),
129            password: Some(password.into()),
130            authorization_id: None,
131        }
132    }
133
134    /// Scram Sha 256.
135    pub fn scram_sha_256(username: impl Into<String>, password: impl Into<String>) -> Self {
136        Self {
137            mechanism: SaslMechanism::ScramSha256,
138            username: Some(username.into()),
139            password: Some(password.into()),
140            authorization_id: None,
141        }
142    }
143
144    /// Scram Sha 512.
145    pub fn scram_sha_512(username: impl Into<String>, password: impl Into<String>) -> Self {
146        Self {
147            mechanism: SaslMechanism::ScramSha512,
148            username: Some(username.into()),
149            password: Some(password.into()),
150            authorization_id: None,
151        }
152    }
153
154    /// Sets authorization id and returns the updated value.
155    pub fn with_authorization_id(mut self, authorization_id: impl Into<String>) -> Self {
156        self.authorization_id = Some(authorization_id.into());
157        self
158    }
159}
160
161#[derive(Debug, Clone, Default, PartialEq, Eq)]
162/// Tls Config.
163pub struct TlsConfig {
164    /// Ca Cert Path.
165    pub ca_cert_path: Option<PathBuf>,
166    /// Client Cert Path.
167    pub client_cert_path: Option<PathBuf>,
168    /// Client Key Path.
169    pub client_key_path: Option<PathBuf>,
170    /// Server Name.
171    pub server_name: Option<String>,
172}
173
174impl TlsConfig {
175    /// Creates a new value.
176    pub fn new() -> Self {
177        Self::default()
178    }
179
180    /// Sets ca cert path and returns the updated value.
181    pub fn with_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
182        self.ca_cert_path = Some(path.into());
183        self
184    }
185
186    /// Sets client cert path and returns the updated value.
187    pub fn with_client_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
188        self.client_cert_path = Some(path.into());
189        self
190    }
191
192    /// Sets client key path and returns the updated value.
193    pub fn with_client_key_path(mut self, path: impl Into<PathBuf>) -> Self {
194        self.client_key_path = Some(path.into());
195        self
196    }
197
198    /// Sets server name and returns the updated value.
199    pub fn with_server_name(mut self, server_name: impl Into<String>) -> Self {
200        self.server_name = Some(server_name.into());
201        self
202    }
203}
204
205#[derive(Debug, Clone, Copy, PartialEq, Eq)]
206/// Producer Partitioner.
207pub enum ProducerPartitioner {
208    /// Default.
209    Default,
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq)]
213/// Producer Compression.
214pub enum ProducerCompression {
215    /// None.
216    None,
217    /// Gzip.
218    Gzip,
219    /// Snappy.
220    Snappy,
221    /// Lz4.
222    Lz4,
223    /// Zstd.
224    Zstd,
225}
226
227impl From<ProducerCompression> for Compression {
228    fn from(value: ProducerCompression) -> Self {
229        match value {
230            ProducerCompression::None => Compression::None,
231            ProducerCompression::Gzip => Compression::Gzip,
232            ProducerCompression::Snappy => Compression::Snappy,
233            ProducerCompression::Lz4 => Compression::Lz4,
234            ProducerCompression::Zstd => Compression::Zstd,
235        }
236    }
237}
238
239#[derive(Debug, Clone)]
240/// Producer Config.
241pub struct ProducerConfig {
242    /// Bootstrap Servers.
243    pub bootstrap_servers: Vec<String>,
244    /// Client Id.
245    pub client_id: String,
246    /// Security Protocol.
247    pub security_protocol: SecurityProtocol,
248    /// Tls.
249    pub tls: TlsConfig,
250    /// Sasl.
251    pub sasl: SaslConfig,
252    /// Acks.
253    pub acks: i16,
254    /// Enable Idempotence.
255    pub enable_idempotence: bool,
256    /// Partitioner.
257    pub partitioner: ProducerPartitioner,
258    /// Compression.
259    pub compression: ProducerCompression,
260    /// Batch Size.
261    pub batch_size: usize,
262    /// Total memory available to buffered producer records.
263    pub buffer_memory: usize,
264    /// Maximum time producer calls may wait for metadata or buffer capacity.
265    pub max_block: Duration,
266    /// Maximum serialized Produce request size.
267    pub max_request_size: usize,
268    /// Whether the default partitioner should ignore record keys.
269    pub partitioner_ignore_keys: bool,
270    /// Linger.
271    pub linger: Duration,
272    /// Delivery Timeout.
273    pub delivery_timeout: Duration,
274    /// Request Timeout.
275    pub request_timeout: Duration,
276    /// Metadata Max Age.
277    pub metadata_max_age: Duration,
278    /// Retry Backoff.
279    pub retry_backoff: Duration,
280    /// Max Retries.
281    pub max_retries: usize,
282    /// Max In Flight Requests Per Connection.
283    pub max_in_flight_requests_per_connection: usize,
284    /// Transactional Id.
285    pub transactional_id: Option<String>,
286    /// Transaction Timeout.
287    pub transaction_timeout: Duration,
288    /// TCP connector.
289    pub tcp_connector: Arc<dyn TcpConnector>,
290}
291
292impl ProducerConfig {
293    /// Creates a new value.
294    pub fn new(bootstrap_server: impl Into<String>) -> Self {
295        Self {
296            bootstrap_servers: vec![bootstrap_server.into()],
297            client_id: "rust-producer".to_owned(),
298            security_protocol: SecurityProtocol::Plaintext,
299            tls: TlsConfig::default(),
300            sasl: SaslConfig::default(),
301            acks: 1,
302            enable_idempotence: false,
303            partitioner: ProducerPartitioner::Default,
304            compression: ProducerCompression::None,
305            batch_size: 16 * 1024,
306            buffer_memory: 32 * 1024 * 1024,
307            max_block: Duration::from_secs(60),
308            max_request_size: 1024 * 1024,
309            partitioner_ignore_keys: false,
310            linger: Duration::ZERO,
311            delivery_timeout: Duration::from_secs(120),
312            request_timeout: Duration::from_secs(5),
313            metadata_max_age: Duration::from_secs(30),
314            retry_backoff: Duration::from_millis(250),
315            max_retries: 3,
316            max_in_flight_requests_per_connection: 5,
317            transactional_id: None,
318            transaction_timeout: Duration::from_secs(30),
319            tcp_connector: Arc::new(TokioTcpConnector),
320        }
321    }
322
323    /// Sets client id and returns the updated value.
324    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
325        self.client_id = client_id.into();
326        self
327    }
328
329    /// Sets bootstrap servers and returns the updated value.
330    pub fn with_bootstrap_servers(
331        mut self,
332        servers: impl IntoIterator<Item = impl Into<String>>,
333    ) -> Self {
334        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
335        self
336    }
337
338    /// Sets security protocol and returns the updated value.
339    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
340        self.security_protocol = security_protocol;
341        self
342    }
343
344    /// Sets tls and returns the updated value.
345    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
346        self.security_protocol = if self.security_protocol.uses_sasl() {
347            SecurityProtocol::SaslSsl
348        } else {
349            SecurityProtocol::Ssl
350        };
351        self.tls = tls;
352        self
353    }
354
355    /// Sets tls ca cert path and returns the updated value.
356    pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
357        self.security_protocol = if self.security_protocol.uses_sasl() {
358            SecurityProtocol::SaslSsl
359        } else {
360            SecurityProtocol::Ssl
361        };
362        self.tls = self.tls.with_ca_cert_path(path);
363        self
364    }
365
366    /// Sets tls client auth paths and returns the updated value.
367    pub fn with_tls_client_auth_paths(
368        mut self,
369        cert_path: impl Into<PathBuf>,
370        key_path: impl Into<PathBuf>,
371    ) -> Self {
372        self.security_protocol = if self.security_protocol.uses_sasl() {
373            SecurityProtocol::SaslSsl
374        } else {
375            SecurityProtocol::Ssl
376        };
377        self.tls = self
378            .tls
379            .with_client_cert_path(cert_path)
380            .with_client_key_path(key_path);
381        self
382    }
383
384    /// Sets tls server name and returns the updated value.
385    pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
386        self.security_protocol = if self.security_protocol.uses_sasl() {
387            SecurityProtocol::SaslSsl
388        } else {
389            SecurityProtocol::Ssl
390        };
391        self.tls = self.tls.with_server_name(server_name);
392        self
393    }
394
395    /// Sets sasl and returns the updated value.
396    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
397        self.security_protocol = self.security_protocol.with_sasl();
398        self.sasl = sasl;
399        self
400    }
401
402    /// Sets sasl plain and returns the updated value.
403    pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
404        self.with_sasl(SaslConfig::plain(username, password))
405    }
406
407    /// Sets sasl scram sha 256 and returns the updated value.
408    pub fn with_sasl_scram_sha_256(
409        self,
410        username: impl Into<String>,
411        password: impl Into<String>,
412    ) -> Self {
413        self.with_sasl(SaslConfig::scram_sha_256(username, password))
414    }
415
416    /// Sets sasl scram sha 512 and returns the updated value.
417    pub fn with_sasl_scram_sha_512(
418        self,
419        username: impl Into<String>,
420        password: impl Into<String>,
421    ) -> Self {
422        self.with_sasl(SaslConfig::scram_sha_512(username, password))
423    }
424
425    /// Sets acks and returns the updated value.
426    pub fn with_acks(mut self, acks: i16) -> Self {
427        self.acks = acks;
428        self
429    }
430
431    /// Sets enable idempotence and returns the updated value.
432    pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
433        self.enable_idempotence = enable_idempotence;
434        if enable_idempotence {
435            self.acks = -1;
436            self.max_retries = self.max_retries.max(1);
437        }
438        self
439    }
440
441    /// Sets partitioner and returns the updated value.
442    pub fn with_partitioner(mut self, partitioner: ProducerPartitioner) -> Self {
443        self.partitioner = partitioner;
444        self
445    }
446
447    /// Sets compression and returns the updated value.
448    pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
449        self.compression = compression;
450        self
451    }
452
453    /// Sets batch size and returns the updated value.
454    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
455        self.batch_size = batch_size.max(1);
456        self
457    }
458
459    /// Sets total memory available to buffered records and returns the updated value.
460    pub fn with_buffer_memory(mut self, buffer_memory: usize) -> Self {
461        self.buffer_memory = buffer_memory;
462        self
463    }
464
465    /// Sets the maximum time producer calls may wait for metadata or buffer capacity.
466    pub fn with_max_block(mut self, max_block: Duration) -> Self {
467        self.max_block = max_block;
468        self
469    }
470
471    /// Sets maximum serialized Produce request size and returns the updated value.
472    pub fn with_max_request_size(mut self, max_request_size: usize) -> Self {
473        self.max_request_size = max_request_size;
474        self
475    }
476
477    /// Sets whether the default partitioner ignores record keys.
478    pub fn with_partitioner_ignore_keys(mut self, ignore_keys: bool) -> Self {
479        self.partitioner_ignore_keys = ignore_keys;
480        self
481    }
482
483    /// Sets linger and returns the updated value.
484    pub fn with_linger(mut self, linger: Duration) -> Self {
485        self.linger = linger;
486        self
487    }
488
489    /// Sets delivery timeout and returns the updated value.
490    pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
491        self.delivery_timeout = delivery_timeout;
492        self
493    }
494
495    /// Sets request timeout and returns the updated value.
496    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
497        self.request_timeout = request_timeout;
498        self
499    }
500
501    /// Sets metadata max age and returns the updated value.
502    pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
503        self.metadata_max_age = metadata_max_age;
504        self
505    }
506
507    /// Sets retry backoff and returns the updated value.
508    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
509        self.retry_backoff = retry_backoff;
510        self
511    }
512
513    /// Sets max retries and returns the updated value.
514    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
515        self.max_retries = max_retries;
516        self
517    }
518
519    /// Sets max in-flight requests per broker connection and returns the updated value.
520    pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
521        self.max_in_flight_requests_per_connection = max_in_flight.max(1);
522        self
523    }
524
525    /// Sets transactional id and returns the updated value.
526    pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
527        self.transactional_id = Some(transactional_id.into());
528        self.acks = -1;
529        self.enable_idempotence = true;
530        self
531    }
532
533    /// Sets transaction timeout and returns the updated value.
534    pub fn with_transaction_timeout(mut self, transaction_timeout: Duration) -> Self {
535        self.transaction_timeout = transaction_timeout;
536        self
537    }
538
539    /// Sets TCP connector and returns the updated value.
540    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
541        self.tcp_connector = tcp_connector;
542        self
543    }
544
545    /// Returns whether transactional.
546    pub fn is_transactional(&self) -> bool {
547        self.transactional_id.is_some()
548    }
549
550    /// Returns whether idempotent.
551    pub fn is_idempotent(&self) -> bool {
552        self.enable_idempotence
553    }
554}
555
556/// Alias for client config.
557pub type ClientConfig = ProducerConfig;
558
559#[derive(Debug, Clone)]
560/// Admin Config.
561pub struct AdminConfig {
562    /// Bootstrap Servers.
563    pub bootstrap_servers: Vec<String>,
564    /// Client Id.
565    pub client_id: String,
566    /// Security Protocol.
567    pub security_protocol: SecurityProtocol,
568    /// Tls.
569    pub tls: TlsConfig,
570    /// Sasl.
571    pub sasl: SaslConfig,
572    /// Request Timeout.
573    pub request_timeout: Duration,
574    /// TCP connector.
575    pub tcp_connector: Arc<dyn TcpConnector>,
576}
577
578impl AdminConfig {
579    /// Creates a new value.
580    pub fn new(bootstrap_server: impl Into<String>) -> Self {
581        Self {
582            bootstrap_servers: vec![bootstrap_server.into()],
583            client_id: "rust-admin".to_owned(),
584            security_protocol: SecurityProtocol::Plaintext,
585            tls: TlsConfig::default(),
586            sasl: SaslConfig::default(),
587            request_timeout: Duration::from_secs(5),
588            tcp_connector: Arc::new(TokioTcpConnector),
589        }
590    }
591
592    /// Sets client id and returns the updated value.
593    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
594        self.client_id = client_id.into();
595        self
596    }
597
598    /// Sets bootstrap servers and returns the updated value.
599    pub fn with_bootstrap_servers(
600        mut self,
601        servers: impl IntoIterator<Item = impl Into<String>>,
602    ) -> Self {
603        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
604        self
605    }
606
607    /// Sets security protocol and returns the updated value.
608    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
609        self.security_protocol = security_protocol;
610        self
611    }
612
613    /// Sets tls and returns the updated value.
614    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
615        self.security_protocol = if self.security_protocol.uses_sasl() {
616            SecurityProtocol::SaslSsl
617        } else {
618            SecurityProtocol::Ssl
619        };
620        self.tls = tls;
621        self
622    }
623
624    /// Sets tls ca cert path and returns the updated value.
625    pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
626        self.security_protocol = if self.security_protocol.uses_sasl() {
627            SecurityProtocol::SaslSsl
628        } else {
629            SecurityProtocol::Ssl
630        };
631        self.tls = self.tls.with_ca_cert_path(path);
632        self
633    }
634
635    /// Sets tls client auth paths and returns the updated value.
636    pub fn with_tls_client_auth_paths(
637        mut self,
638        cert_path: impl Into<PathBuf>,
639        key_path: impl Into<PathBuf>,
640    ) -> Self {
641        self.security_protocol = if self.security_protocol.uses_sasl() {
642            SecurityProtocol::SaslSsl
643        } else {
644            SecurityProtocol::Ssl
645        };
646        self.tls = self
647            .tls
648            .with_client_cert_path(cert_path)
649            .with_client_key_path(key_path);
650        self
651    }
652
653    /// Sets tls server name and returns the updated value.
654    pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
655        self.security_protocol = if self.security_protocol.uses_sasl() {
656            SecurityProtocol::SaslSsl
657        } else {
658            SecurityProtocol::Ssl
659        };
660        self.tls = self.tls.with_server_name(server_name);
661        self
662    }
663
664    /// Sets sasl and returns the updated value.
665    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
666        self.security_protocol = self.security_protocol.with_sasl();
667        self.sasl = sasl;
668        self
669    }
670
671    /// Sets sasl plain and returns the updated value.
672    pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
673        self.with_sasl(SaslConfig::plain(username, password))
674    }
675
676    /// Sets sasl scram sha 256 and returns the updated value.
677    pub fn with_sasl_scram_sha_256(
678        self,
679        username: impl Into<String>,
680        password: impl Into<String>,
681    ) -> Self {
682        self.with_sasl(SaslConfig::scram_sha_256(username, password))
683    }
684
685    /// Sets sasl scram sha 512 and returns the updated value.
686    pub fn with_sasl_scram_sha_512(
687        self,
688        username: impl Into<String>,
689        password: impl Into<String>,
690    ) -> Self {
691        self.with_sasl(SaslConfig::scram_sha_512(username, password))
692    }
693
694    /// Sets request timeout and returns the updated value.
695    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
696        self.request_timeout = request_timeout;
697        self
698    }
699
700    /// Sets TCP connector and returns the updated value.
701    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
702        self.tcp_connector = tcp_connector;
703        self
704    }
705}
706
707#[derive(Debug, Clone, Copy, PartialEq, Eq)]
708/// Auto Offset Reset.
709pub enum AutoOffsetReset {
710    /// Earliest.
711    Earliest,
712    /// Latest.
713    Latest,
714}
715
716#[derive(Debug, Clone, Copy, PartialEq, Eq)]
717/// Isolation Level.
718pub enum IsolationLevel {
719    /// Read uncommitted.
720    ReadUncommitted,
721    /// Read committed.
722    ReadCommitted,
723}
724
725impl IsolationLevel {
726    /// As Protocol Value.
727    pub fn as_protocol_value(self) -> i8 {
728        match self {
729            Self::ReadUncommitted => READ_UNCOMMITTED,
730            Self::ReadCommitted => READ_COMMITTED,
731        }
732    }
733}
734
735#[derive(Debug, Clone)]
736/// Consumer Config.
737pub struct ConsumerConfig {
738    /// Bootstrap Servers.
739    pub bootstrap_servers: Vec<String>,
740    /// Client Id.
741    pub client_id: String,
742    /// Group Id.
743    pub group_id: String,
744    /// Security Protocol.
745    pub security_protocol: SecurityProtocol,
746    /// Tls.
747    pub tls: TlsConfig,
748    /// Sasl.
749    pub sasl: SaslConfig,
750    /// Request Timeout.
751    pub request_timeout: Duration,
752    /// Metadata Max Age.
753    pub metadata_max_age: Duration,
754    /// Retry Backoff.
755    pub retry_backoff: Duration,
756    /// Max Retries.
757    pub max_retries: usize,
758    /// Rebalance Timeout.
759    pub rebalance_timeout: Duration,
760    /// Fetch Max Wait.
761    pub fetch_max_wait: Duration,
762    /// Fetch Min Bytes.
763    pub fetch_min_bytes: i32,
764    /// Fetch Max Bytes.
765    pub fetch_max_bytes: i32,
766    /// Partition Max Bytes.
767    pub partition_max_bytes: i32,
768    /// Auto Offset Reset.
769    pub auto_offset_reset: AutoOffsetReset,
770    /// Isolation Level.
771    pub isolation_level: IsolationLevel,
772    /// Enable Auto Commit.
773    pub enable_auto_commit: bool,
774    /// Auto Commit Interval.
775    pub auto_commit_interval: Duration,
776    /// Server Assignor.
777    pub server_assignor: Option<String>,
778    /// Rack Id.
779    pub rack_id: Option<String>,
780    /// Instance Id.
781    pub instance_id: Option<String>,
782    /// Rebalance Listener.
783    pub rebalance_listener: Option<ConsumerRebalanceListener>,
784    /// TCP connector.
785    pub tcp_connector: Arc<dyn TcpConnector>,
786}
787
788impl ConsumerConfig {
789    /// Creates a new value.
790    pub fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
791        Self {
792            bootstrap_servers: vec![bootstrap_server.into()],
793            client_id: "rust-consumer".to_owned(),
794            group_id: group_id.into(),
795            security_protocol: SecurityProtocol::Plaintext,
796            tls: TlsConfig::default(),
797            sasl: SaslConfig::default(),
798            request_timeout: Duration::from_secs(5),
799            metadata_max_age: Duration::from_secs(30),
800            retry_backoff: Duration::from_millis(250),
801            max_retries: 3,
802            rebalance_timeout: Duration::from_secs(30),
803            fetch_max_wait: Duration::from_millis(500),
804            fetch_min_bytes: 1,
805            fetch_max_bytes: 50 * 1024 * 1024,
806            partition_max_bytes: 1024 * 1024,
807            auto_offset_reset: AutoOffsetReset::Earliest,
808            isolation_level: IsolationLevel::ReadUncommitted,
809            enable_auto_commit: false,
810            auto_commit_interval: Duration::from_secs(5),
811            server_assignor: None,
812            rack_id: None,
813            instance_id: None,
814            rebalance_listener: None,
815            tcp_connector: Arc::new(TokioTcpConnector),
816        }
817    }
818
819    /// Sets client id and returns the updated value.
820    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
821        self.client_id = client_id.into();
822        self
823    }
824
825    /// Sets bootstrap servers and returns the updated value.
826    pub fn with_bootstrap_servers(
827        mut self,
828        servers: impl IntoIterator<Item = impl Into<String>>,
829    ) -> Self {
830        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
831        self
832    }
833
834    /// Sets security protocol and returns the updated value.
835    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
836        self.security_protocol = security_protocol;
837        self
838    }
839
840    /// Sets tls and returns the updated value.
841    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
842        self.security_protocol = if self.security_protocol.uses_sasl() {
843            SecurityProtocol::SaslSsl
844        } else {
845            SecurityProtocol::Ssl
846        };
847        self.tls = tls;
848        self
849    }
850
851    /// Sets tls ca cert path and returns the updated value.
852    pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
853        self.security_protocol = if self.security_protocol.uses_sasl() {
854            SecurityProtocol::SaslSsl
855        } else {
856            SecurityProtocol::Ssl
857        };
858        self.tls = self.tls.with_ca_cert_path(path);
859        self
860    }
861
862    /// Sets tls client auth paths and returns the updated value.
863    pub fn with_tls_client_auth_paths(
864        mut self,
865        cert_path: impl Into<PathBuf>,
866        key_path: impl Into<PathBuf>,
867    ) -> Self {
868        self.security_protocol = if self.security_protocol.uses_sasl() {
869            SecurityProtocol::SaslSsl
870        } else {
871            SecurityProtocol::Ssl
872        };
873        self.tls = self
874            .tls
875            .with_client_cert_path(cert_path)
876            .with_client_key_path(key_path);
877        self
878    }
879
880    /// Sets tls server name and returns the updated value.
881    pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
882        self.security_protocol = if self.security_protocol.uses_sasl() {
883            SecurityProtocol::SaslSsl
884        } else {
885            SecurityProtocol::Ssl
886        };
887        self.tls = self.tls.with_server_name(server_name);
888        self
889    }
890
891    /// Sets sasl and returns the updated value.
892    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
893        self.security_protocol = self.security_protocol.with_sasl();
894        self.sasl = sasl;
895        self
896    }
897
898    /// Sets sasl plain and returns the updated value.
899    pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
900        self.with_sasl(SaslConfig::plain(username, password))
901    }
902
903    /// Sets sasl scram sha 256 and returns the updated value.
904    pub fn with_sasl_scram_sha_256(
905        self,
906        username: impl Into<String>,
907        password: impl Into<String>,
908    ) -> Self {
909        self.with_sasl(SaslConfig::scram_sha_256(username, password))
910    }
911
912    /// Sets sasl scram sha 512 and returns the updated value.
913    pub fn with_sasl_scram_sha_512(
914        self,
915        username: impl Into<String>,
916        password: impl Into<String>,
917    ) -> Self {
918        self.with_sasl(SaslConfig::scram_sha_512(username, password))
919    }
920
921    /// Sets request timeout and returns the updated value.
922    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
923        self.request_timeout = request_timeout;
924        self
925    }
926
927    /// Sets metadata max age and returns the updated value.
928    pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
929        self.metadata_max_age = metadata_max_age;
930        self
931    }
932
933    /// Sets retry backoff and returns the updated value.
934    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
935        self.retry_backoff = retry_backoff;
936        self
937    }
938
939    /// Sets max retries and returns the updated value.
940    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
941        self.max_retries = max_retries;
942        self
943    }
944
945    /// Sets rebalance timeout and returns the updated value.
946    pub fn with_rebalance_timeout(mut self, rebalance_timeout: Duration) -> Self {
947        self.rebalance_timeout = rebalance_timeout;
948        self
949    }
950
951    /// Sets fetch max wait and returns the updated value.
952    pub fn with_fetch_max_wait(mut self, fetch_max_wait: Duration) -> Self {
953        self.fetch_max_wait = fetch_max_wait;
954        self
955    }
956
957    /// Sets fetch min bytes and returns the updated value.
958    pub fn with_fetch_min_bytes(mut self, fetch_min_bytes: i32) -> Self {
959        self.fetch_min_bytes = fetch_min_bytes;
960        self
961    }
962
963    /// Sets fetch max bytes and returns the updated value.
964    pub fn with_fetch_max_bytes(mut self, fetch_max_bytes: i32) -> Self {
965        self.fetch_max_bytes = fetch_max_bytes;
966        self
967    }
968
969    /// Sets partition max bytes and returns the updated value.
970    pub fn with_partition_max_bytes(mut self, partition_max_bytes: i32) -> Self {
971        self.partition_max_bytes = partition_max_bytes;
972        self
973    }
974
975    /// Sets auto offset reset and returns the updated value.
976    pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
977        self.auto_offset_reset = auto_offset_reset;
978        self
979    }
980
981    /// Sets isolation level and returns the updated value.
982    pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
983        self.isolation_level = isolation_level;
984        self
985    }
986
987    /// Sets enable auto commit and returns the updated value.
988    pub fn with_enable_auto_commit(mut self, enable_auto_commit: bool) -> Self {
989        self.enable_auto_commit = enable_auto_commit;
990        self
991    }
992
993    /// Sets auto commit interval and returns the updated value.
994    pub fn with_auto_commit_interval(mut self, auto_commit_interval: Duration) -> Self {
995        self.auto_commit_interval = auto_commit_interval;
996        self
997    }
998
999    /// Sets server assignor and returns the updated value.
1000    pub fn with_server_assignor(mut self, server_assignor: impl Into<String>) -> Self {
1001        self.server_assignor = Some(server_assignor.into());
1002        self
1003    }
1004
1005    /// Sets rack id and returns the updated value.
1006    pub fn with_rack_id(mut self, rack_id: impl Into<String>) -> Self {
1007        self.rack_id = Some(rack_id.into());
1008        self
1009    }
1010
1011    /// Sets instance id and returns the updated value.
1012    pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
1013        self.instance_id = Some(instance_id.into());
1014        self
1015    }
1016
1017    /// Sets rebalance listener and returns the updated value.
1018    pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
1019        self.rebalance_listener = Some(listener);
1020        self
1021    }
1022
1023    /// Sets rebalance callback and returns the updated value.
1024    pub fn with_rebalance_callback(
1025        self,
1026        callback: impl Fn(crate::types::ConsumerRebalanceEvent) + Send + Sync + 'static,
1027    ) -> Self {
1028        self.with_rebalance_listener(ConsumerRebalanceListener::new(callback))
1029    }
1030
1031    /// Sets TCP connector and returns the updated value.
1032    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
1033        self.tcp_connector = tcp_connector;
1034        self
1035    }
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040    use super::*;
1041
1042    #[test]
1043    fn producer_tls_builder_enables_ssl() {
1044        let config =
1045            ProducerConfig::new("localhost:9093").with_tls_ca_cert_path("/tmp/cluster-ca.pem");
1046        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1047        assert_eq!(
1048            config.tls.ca_cert_path.as_deref(),
1049            Some(std::path::Path::new("/tmp/cluster-ca.pem"))
1050        );
1051    }
1052
1053    #[test]
1054    fn producer_sasl_plain_builder_enables_sasl_plaintext() {
1055        let config = ProducerConfig::new("localhost:9092").with_sasl_plain("user-a", "secret-a");
1056        assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
1057        assert_eq!(config.sasl.mechanism, SaslMechanism::Plain);
1058        assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
1059        assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
1060    }
1061
1062    #[test]
1063    fn sasl_config_debug_redacts_passwords() {
1064        let config = ProducerConfig::new("localhost:9092").with_sasl_plain("user-a", "secret-a");
1065
1066        let debug = format!("{config:?}");
1067
1068        assert!(debug.contains("user-a"));
1069        assert!(debug.contains("<redacted>"));
1070        assert!(!debug.contains("secret-a"));
1071    }
1072
1073    #[test]
1074    fn producer_sasl_scram_builder_enables_sasl_plaintext() {
1075        let config =
1076            ProducerConfig::new("localhost:9092").with_sasl_scram_sha_256("user-a", "secret-a");
1077        assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
1078        assert_eq!(config.sasl.mechanism, SaslMechanism::ScramSha256);
1079        assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
1080        assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
1081    }
1082
1083    #[test]
1084    fn producer_tls_and_sasl_builders_enable_sasl_ssl() {
1085        let config = ProducerConfig::new("localhost:9093")
1086            .with_sasl_plain("user-a", "secret-a")
1087            .with_tls_ca_cert_path("/tmp/cluster-ca.pem");
1088        assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
1089
1090        let config = ProducerConfig::new("localhost:9093")
1091            .with_tls_ca_cert_path("/tmp/cluster-ca.pem")
1092            .with_sasl_plain("user-a", "secret-a");
1093        assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
1094    }
1095
1096    #[test]
1097    fn producer_multi_broker_with_bootstrap_servers() {
1098        let config = ProducerConfig::new("host1:9092").with_bootstrap_servers([
1099            "host1:9092",
1100            "host2:9092",
1101            "host3:9092",
1102        ]);
1103        assert_eq!(
1104            config.bootstrap_servers,
1105            vec!["host1:9092", "host2:9092", "host3:9092"]
1106        );
1107    }
1108
1109    #[test]
1110    fn producer_builder_records_tuning_and_transaction_options() {
1111        let config = ProducerConfig::new("localhost:9092")
1112            .with_client_id("producer-a")
1113            .with_security_protocol(SecurityProtocol::Ssl)
1114            .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1115            .with_tls_server_name("kafka.internal")
1116            .with_acks(-1)
1117            .with_enable_idempotence(true)
1118            .with_partitioner(ProducerPartitioner::Default)
1119            .with_compression(ProducerCompression::Zstd)
1120            .with_batch_size(0)
1121            .with_buffer_memory(4096)
1122            .with_max_block(Duration::from_millis(250))
1123            .with_max_request_size(2048)
1124            .with_partitioner_ignore_keys(true)
1125            .with_linger(Duration::from_millis(50))
1126            .with_delivery_timeout(Duration::from_secs(10))
1127            .with_request_timeout(Duration::from_secs(2))
1128            .with_metadata_max_age(Duration::from_secs(60))
1129            .with_retry_backoff(Duration::from_millis(75))
1130            .with_max_retries(7)
1131            .with_transactional_id("tx-a")
1132            .with_transaction_timeout(Duration::from_secs(45));
1133
1134        assert_eq!(config.client_id, "producer-a");
1135        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1136        assert_eq!(
1137            config.tls.client_cert_path.as_deref(),
1138            Some(std::path::Path::new("/tmp/client.crt"))
1139        );
1140        assert_eq!(
1141            config.tls.client_key_path.as_deref(),
1142            Some(std::path::Path::new("/tmp/client.key"))
1143        );
1144        assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1145        assert_eq!(config.acks, -1);
1146        assert!(config.is_idempotent());
1147        assert_eq!(config.compression, ProducerCompression::Zstd);
1148        assert_eq!(config.batch_size, 1);
1149        assert_eq!(config.buffer_memory, 4096);
1150        assert_eq!(config.max_block, Duration::from_millis(250));
1151        assert_eq!(config.max_request_size, 2048);
1152        assert!(config.partitioner_ignore_keys);
1153        assert_eq!(config.linger, Duration::from_millis(50));
1154        assert_eq!(config.delivery_timeout, Duration::from_secs(10));
1155        assert_eq!(config.request_timeout, Duration::from_secs(2));
1156        assert_eq!(config.metadata_max_age, Duration::from_secs(60));
1157        assert_eq!(config.retry_backoff, Duration::from_millis(75));
1158        assert_eq!(config.max_retries, 7);
1159        assert_eq!(config.transactional_id.as_deref(), Some("tx-a"));
1160        assert_eq!(config.transaction_timeout, Duration::from_secs(45));
1161        assert!(config.is_transactional());
1162    }
1163
1164    #[test]
1165    fn consumer_tls_builder_keeps_override_server_name() {
1166        let config = ConsumerConfig::new("localhost:9093", "group-a")
1167            .with_tls(TlsConfig::new().with_server_name("kafka.internal"));
1168        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1169        assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
1170    }
1171
1172    #[test]
1173    fn consumer_multi_broker_with_bootstrap_servers() {
1174        let config = ConsumerConfig::new("host1:9092", "group-a")
1175            .with_bootstrap_servers(["host1:9092", "host2:9092"]);
1176        assert_eq!(config.bootstrap_servers, vec!["host1:9092", "host2:9092"]);
1177    }
1178
1179    #[test]
1180    fn admin_tls_builder_records_client_auth_paths() {
1181        let config = AdminConfig::new("localhost:9093")
1182            .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key");
1183        assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
1184        assert_eq!(
1185            config.tls.client_cert_path.as_deref(),
1186            Some(std::path::Path::new("/tmp/client.crt"))
1187        );
1188        assert_eq!(
1189            config.tls.client_key_path.as_deref(),
1190            Some(std::path::Path::new("/tmp/client.key"))
1191        );
1192    }
1193
1194    #[test]
1195    fn admin_and_consumer_builders_record_remaining_options() {
1196        let admin = AdminConfig::new("localhost:9092")
1197            .with_client_id("admin-a")
1198            .with_bootstrap_servers(["host-a:9092", "host-b:9092"])
1199            .with_security_protocol(SecurityProtocol::Plaintext)
1200            .with_tls(TlsConfig::new().with_ca_cert_path("/tmp/ca.pem"))
1201            .with_sasl_scram_sha_512("user-a", "secret-a")
1202            .with_request_timeout(Duration::from_secs(8));
1203
1204        assert_eq!(admin.bootstrap_servers, vec!["host-a:9092", "host-b:9092"]);
1205        assert_eq!(admin.client_id, "admin-a");
1206        assert_eq!(admin.security_protocol, SecurityProtocol::SaslSsl);
1207        assert_eq!(
1208            admin.tls.ca_cert_path.as_deref(),
1209            Some(std::path::Path::new("/tmp/ca.pem"))
1210        );
1211        assert_eq!(admin.sasl.mechanism, SaslMechanism::ScramSha512);
1212        assert_eq!(admin.request_timeout, Duration::from_secs(8));
1213
1214        let consumer = ConsumerConfig::new("localhost:9092", "group-a")
1215            .with_client_id("consumer-a")
1216            .with_security_protocol(SecurityProtocol::Ssl)
1217            .with_tls_ca_cert_path("/tmp/ca.pem")
1218            .with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
1219            .with_sasl_scram_sha_512("user-b", "secret-b")
1220            .with_request_timeout(Duration::from_secs(3))
1221            .with_metadata_max_age(Duration::from_secs(4))
1222            .with_retry_backoff(Duration::from_millis(5))
1223            .with_max_retries(6)
1224            .with_rebalance_timeout(Duration::from_secs(7))
1225            .with_fetch_max_wait(Duration::from_millis(8))
1226            .with_fetch_min_bytes(9)
1227            .with_fetch_max_bytes(10)
1228            .with_partition_max_bytes(11)
1229            .with_auto_offset_reset(AutoOffsetReset::Latest)
1230            .with_isolation_level(IsolationLevel::ReadCommitted)
1231            .with_enable_auto_commit(true)
1232            .with_auto_commit_interval(Duration::from_secs(12))
1233            .with_server_assignor("range")
1234            .with_rack_id("rack-a")
1235            .with_instance_id("instance-a");
1236
1237        assert_eq!(consumer.client_id, "consumer-a");
1238        assert_eq!(consumer.security_protocol, SecurityProtocol::SaslSsl);
1239        assert_eq!(consumer.sasl.mechanism, SaslMechanism::ScramSha512);
1240        assert_eq!(consumer.request_timeout, Duration::from_secs(3));
1241        assert_eq!(consumer.metadata_max_age, Duration::from_secs(4));
1242        assert_eq!(consumer.retry_backoff, Duration::from_millis(5));
1243        assert_eq!(consumer.max_retries, 6);
1244        assert_eq!(consumer.rebalance_timeout, Duration::from_secs(7));
1245        assert_eq!(consumer.fetch_max_wait, Duration::from_millis(8));
1246        assert_eq!(consumer.fetch_min_bytes, 9);
1247        assert_eq!(consumer.fetch_max_bytes, 10);
1248        assert_eq!(consumer.partition_max_bytes, 11);
1249        assert_eq!(consumer.auto_offset_reset, AutoOffsetReset::Latest);
1250        assert_eq!(consumer.isolation_level, IsolationLevel::ReadCommitted);
1251        assert!(consumer.enable_auto_commit);
1252        assert_eq!(consumer.auto_commit_interval, Duration::from_secs(12));
1253        assert_eq!(consumer.server_assignor.as_deref(), Some("range"));
1254        assert_eq!(consumer.rack_id.as_deref(), Some("rack-a"));
1255        assert_eq!(consumer.instance_id.as_deref(), Some("instance-a"));
1256    }
1257}