Skip to main content

kafkit_client/
api.rs

1//! Builder-style entry points for the public client API.
2//!
3//! ```no_run
4//! # async fn example() -> kafkit_client::Result<()> {
5//! use kafkit_client::KafkaClient;
6//!
7//! let client = KafkaClient::new("localhost:9092");
8//! let producer = client.topic("orders").producer().connect().await?;
9//! producer.send_value("created").await?;
10//! producer.shutdown().await?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::sync::Arc;
16use std::time::Duration;
17
18use crate::admin::KafkaAdmin;
19use crate::consumer::KafkaConsumer;
20use crate::network::TcpConnector;
21use crate::producer::KafkaProducer;
22use crate::{
23    AdminConfig, AutoOffsetReset, ConsumerConfig, IsolationLevel, ProducerCompression,
24    ProducerConfig, Result, SaslConfig, SecurityProtocol, TlsConfig, ValidationError,
25};
26use crate::{ConsumerRebalanceEvent, ConsumerRebalanceListener};
27
28const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
29
30#[derive(Debug, Clone)]
31/// A small builder facade around one or more bootstrap servers.
32pub struct KafkaClient {
33    bootstrap_servers: Vec<String>,
34}
35
36impl KafkaClient {
37    /// Creates a client facade from one bootstrap server.
38    pub fn new(bootstrap_server: impl Into<String>) -> Self {
39        Self {
40            bootstrap_servers: vec![bootstrap_server.into()],
41        }
42    }
43
44    /// Sets bootstrap servers and returns the updated value.
45    pub fn with_bootstrap_servers(
46        mut self,
47        servers: impl IntoIterator<Item = impl Into<String>>,
48    ) -> Self {
49        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
50        self
51    }
52
53    /// Returns a topic-scoped builder facade.
54    pub fn topic(&self, topic: impl Into<String>) -> KafkaTopic {
55        KafkaTopic {
56            bootstrap_servers: self.bootstrap_servers.clone(),
57            topic: topic.into(),
58        }
59    }
60
61    /// Returns a multi-topic builder facade.
62    pub fn topics<I, S>(&self, topics: I) -> KafkaTopics
63    where
64        I: IntoIterator<Item = S>,
65        S: Into<String>,
66    {
67        KafkaTopics {
68            bootstrap_servers: self.bootstrap_servers.clone(),
69            topics: topics.into_iter().map(Into::into).collect(),
70        }
71    }
72
73    /// Starts building a producer.
74    pub fn producer(&self) -> ProducerBuilder {
75        ProducerBuilder::from_servers(self.bootstrap_servers.clone())
76    }
77
78    /// Starts building an admin client.
79    pub fn admin(&self) -> AdminBuilder {
80        AdminBuilder::from_servers(self.bootstrap_servers.clone())
81    }
82
83    /// Starts building a group consumer.
84    pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
85        ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
86    }
87}
88
89#[derive(Debug, Clone)]
90/// A topic-scoped facade for producers and consumers.
91pub struct KafkaTopic {
92    bootstrap_servers: Vec<String>,
93    topic: String,
94}
95
96impl KafkaTopic {
97    /// Starts building a producer with this topic as the default.
98    pub fn producer(&self) -> ProducerBuilder {
99        ProducerBuilder::from_servers(self.bootstrap_servers.clone())
100            .with_default_topic(self.topic.clone())
101    }
102
103    /// Starts building a consumer already subscribed to this topic.
104    pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
105        ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
106            .with_topic(self.topic.clone())
107    }
108}
109
110#[derive(Debug, Clone)]
111/// A multi-topic facade for consumers.
112pub struct KafkaTopics {
113    bootstrap_servers: Vec<String>,
114    topics: Vec<String>,
115}
116
117impl KafkaTopics {
118    /// Starts building a consumer already subscribed to these topics.
119    pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
120        ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
121            .with_topics(self.topics.clone())
122    }
123}
124
125#[derive(Debug, Clone)]
126/// Builder for a [`KafkaProducer`].
127pub struct ProducerBuilder {
128    config: ProducerConfig,
129    default_topic: Option<String>,
130    default_partition: Option<i32>,
131}
132
133impl ProducerBuilder {
134    fn new(bootstrap_server: impl Into<String>) -> Self {
135        Self {
136            config: ProducerConfig::new(bootstrap_server),
137            default_topic: None,
138            default_partition: None,
139        }
140    }
141
142    fn from_servers(servers: Vec<String>) -> Self {
143        let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
144        builder.config = builder.config.with_bootstrap_servers(servers);
145        builder
146    }
147
148    /// Sets bootstrap servers and returns the updated value.
149    pub fn with_bootstrap_servers(
150        mut self,
151        servers: impl IntoIterator<Item = impl Into<String>>,
152    ) -> Self {
153        self.config = self.config.with_bootstrap_servers(servers);
154        self
155    }
156
157    /// Sets client id and returns the updated value.
158    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
159        self.config = self.config.with_client_id(client_id);
160        self
161    }
162
163    /// Sets security protocol and returns the updated value.
164    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
165        self.config = self.config.with_security_protocol(security_protocol);
166        self
167    }
168
169    /// Sets tls and returns the updated value.
170    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
171        self.config = self.config.with_tls(tls);
172        self
173    }
174
175    /// Sets sasl and returns the updated value.
176    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
177        self.config = self.config.with_sasl(sasl);
178        self
179    }
180
181    /// Sets sasl plain and returns the updated value.
182    pub fn with_sasl_plain(
183        mut self,
184        username: impl Into<String>,
185        password: impl Into<String>,
186    ) -> Self {
187        self.config = self.config.with_sasl_plain(username, password);
188        self
189    }
190
191    /// Sets sasl scram sha 256 and returns the updated value.
192    pub fn with_sasl_scram_sha_256(
193        mut self,
194        username: impl Into<String>,
195        password: impl Into<String>,
196    ) -> Self {
197        self.config = self.config.with_sasl_scram_sha_256(username, password);
198        self
199    }
200
201    /// Sets sasl scram sha 512 and returns the updated value.
202    pub fn with_sasl_scram_sha_512(
203        mut self,
204        username: impl Into<String>,
205        password: impl Into<String>,
206    ) -> Self {
207        self.config = self.config.with_sasl_scram_sha_512(username, password);
208        self
209    }
210
211    /// Sets compression and returns the updated value.
212    pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
213        self.config = self.config.with_compression(compression);
214        self
215    }
216
217    /// Sets default topic and returns the updated value.
218    pub fn with_default_topic(mut self, topic: impl Into<String>) -> Self {
219        self.default_topic = Some(topic.into());
220        self
221    }
222
223    /// Sets default partition and returns the updated value.
224    pub fn with_default_partition(mut self, partition: i32) -> Self {
225        self.default_partition = Some(partition);
226        self
227    }
228
229    /// Sets enable idempotence and returns the updated value.
230    pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
231        self.config = self.config.with_enable_idempotence(enable_idempotence);
232        self
233    }
234
235    /// Sets batch size and returns the updated value.
236    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
237        self.config = self.config.with_batch_size(batch_size);
238        self
239    }
240
241    /// Sets total memory available to buffered records and returns the updated value.
242    pub fn with_buffer_memory(mut self, buffer_memory: usize) -> Self {
243        self.config = self.config.with_buffer_memory(buffer_memory);
244        self
245    }
246
247    /// Sets the maximum time producer calls may wait for metadata or buffer capacity.
248    pub fn with_max_block(mut self, max_block: Duration) -> Self {
249        self.config = self.config.with_max_block(max_block);
250        self
251    }
252
253    /// Sets maximum serialized Produce request size and returns the updated value.
254    pub fn with_max_request_size(mut self, max_request_size: usize) -> Self {
255        self.config = self.config.with_max_request_size(max_request_size);
256        self
257    }
258
259    /// Sets whether the default partitioner ignores record keys.
260    pub fn with_partitioner_ignore_keys(mut self, ignore_keys: bool) -> Self {
261        self.config = self.config.with_partitioner_ignore_keys(ignore_keys);
262        self
263    }
264
265    /// Sets linger and returns the updated value.
266    pub fn with_linger(mut self, linger: Duration) -> Self {
267        self.config = self.config.with_linger(linger);
268        self
269    }
270
271    /// Sets delivery timeout and returns the updated value.
272    pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
273        self.config = self.config.with_delivery_timeout(delivery_timeout);
274        self
275    }
276
277    /// Sets request timeout and returns the updated value.
278    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
279        self.config = self.config.with_request_timeout(request_timeout);
280        self
281    }
282
283    /// Sets retry backoff and returns the updated value.
284    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
285        self.config = self.config.with_retry_backoff(retry_backoff);
286        self
287    }
288
289    /// Sets max retries and returns the updated value.
290    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
291        self.config = self.config.with_max_retries(max_retries);
292        self
293    }
294
295    /// Sets max in-flight requests per broker connection and returns the updated value.
296    pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
297        self.config = self
298            .config
299            .with_max_in_flight_requests_per_connection(max_in_flight);
300        self
301    }
302
303    /// Sets transactional id and returns the updated value.
304    pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
305        self.config = self.config.with_transactional_id(transactional_id);
306        self
307    }
308
309    /// Sets TCP connector and returns the updated value.
310    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
311        self.config = self.config.with_tcp_connector(tcp_connector);
312        self
313    }
314
315    /// Connects and returns a producer.
316    pub async fn connect(self) -> Result<KafkaProducer> {
317        let producer = KafkaProducer::connect(self.config).await?;
318        Ok(producer.with_defaults(self.default_topic, self.default_partition))
319    }
320}
321
322#[derive(Debug, Clone)]
323/// Builder for a [`KafkaAdmin`].
324pub struct AdminBuilder {
325    config: AdminConfig,
326}
327
328impl AdminBuilder {
329    fn new(bootstrap_server: impl Into<String>) -> Self {
330        Self {
331            config: AdminConfig::new(bootstrap_server),
332        }
333    }
334
335    fn from_servers(servers: Vec<String>) -> Self {
336        let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
337        builder.config = builder.config.with_bootstrap_servers(servers);
338        builder
339    }
340
341    /// Sets bootstrap servers and returns the updated value.
342    pub fn with_bootstrap_servers(
343        mut self,
344        servers: impl IntoIterator<Item = impl Into<String>>,
345    ) -> Self {
346        self.config = self.config.with_bootstrap_servers(servers);
347        self
348    }
349
350    /// Sets client id and returns the updated value.
351    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
352        self.config = self.config.with_client_id(client_id);
353        self
354    }
355
356    /// Sets request timeout and returns the updated value.
357    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
358        self.config = self.config.with_request_timeout(request_timeout);
359        self
360    }
361
362    /// Sets security protocol and returns the updated value.
363    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
364        self.config = self.config.with_security_protocol(security_protocol);
365        self
366    }
367
368    /// Sets tls and returns the updated value.
369    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
370        self.config = self.config.with_tls(tls);
371        self
372    }
373
374    /// Sets sasl and returns the updated value.
375    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
376        self.config = self.config.with_sasl(sasl);
377        self
378    }
379
380    /// Sets sasl plain and returns the updated value.
381    pub fn with_sasl_plain(
382        mut self,
383        username: impl Into<String>,
384        password: impl Into<String>,
385    ) -> Self {
386        self.config = self.config.with_sasl_plain(username, password);
387        self
388    }
389
390    /// Sets sasl scram sha 256 and returns the updated value.
391    pub fn with_sasl_scram_sha_256(
392        mut self,
393        username: impl Into<String>,
394        password: impl Into<String>,
395    ) -> Self {
396        self.config = self.config.with_sasl_scram_sha_256(username, password);
397        self
398    }
399
400    /// Sets sasl scram sha 512 and returns the updated value.
401    pub fn with_sasl_scram_sha_512(
402        mut self,
403        username: impl Into<String>,
404        password: impl Into<String>,
405    ) -> Self {
406        self.config = self.config.with_sasl_scram_sha_512(username, password);
407        self
408    }
409
410    /// Sets TCP connector and returns the updated value.
411    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
412        self.config = self.config.with_tcp_connector(tcp_connector);
413        self
414    }
415
416    /// Connects and returns an admin client.
417    pub async fn connect(self) -> Result<KafkaAdmin> {
418        KafkaAdmin::connect(self.config).await
419    }
420}
421
422#[derive(Debug, Clone)]
423/// Builder for a [`KafkaConsumer`].
424pub struct ConsumerBuilder {
425    config: ConsumerConfig,
426    topics: Vec<String>,
427    poll_timeout: Duration,
428}
429
430impl ConsumerBuilder {
431    fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
432        Self {
433            config: ConsumerConfig::new(bootstrap_server, group_id),
434            topics: Vec::new(),
435            poll_timeout: DEFAULT_POLL_TIMEOUT,
436        }
437    }
438
439    fn from_servers(servers: Vec<String>, group_id: impl Into<String>) -> Self {
440        let first = servers.first().cloned().unwrap_or_default();
441        let mut builder = Self::new(first, group_id);
442        builder.config = builder.config.with_bootstrap_servers(servers);
443        builder
444    }
445
446    /// Sets bootstrap servers and returns the updated value.
447    pub fn with_bootstrap_servers(
448        mut self,
449        servers: impl IntoIterator<Item = impl Into<String>>,
450    ) -> Self {
451        self.config = self.config.with_bootstrap_servers(servers);
452        self
453    }
454
455    /// Sets client id and returns the updated value.
456    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
457        self.config = self.config.with_client_id(client_id);
458        self
459    }
460
461    /// Sets security protocol and returns the updated value.
462    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
463        self.config = self.config.with_security_protocol(security_protocol);
464        self
465    }
466
467    /// Sets tls and returns the updated value.
468    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
469        self.config = self.config.with_tls(tls);
470        self
471    }
472
473    /// Sets sasl and returns the updated value.
474    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
475        self.config = self.config.with_sasl(sasl);
476        self
477    }
478
479    /// Sets sasl plain and returns the updated value.
480    pub fn with_sasl_plain(
481        mut self,
482        username: impl Into<String>,
483        password: impl Into<String>,
484    ) -> Self {
485        self.config = self.config.with_sasl_plain(username, password);
486        self
487    }
488
489    /// Sets sasl scram sha 256 and returns the updated value.
490    pub fn with_sasl_scram_sha_256(
491        mut self,
492        username: impl Into<String>,
493        password: impl Into<String>,
494    ) -> Self {
495        self.config = self.config.with_sasl_scram_sha_256(username, password);
496        self
497    }
498
499    /// Sets sasl scram sha 512 and returns the updated value.
500    pub fn with_sasl_scram_sha_512(
501        mut self,
502        username: impl Into<String>,
503        password: impl Into<String>,
504    ) -> Self {
505        self.config = self.config.with_sasl_scram_sha_512(username, password);
506        self
507    }
508
509    /// Sets topic and returns the updated value.
510    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
511        self.topics.push(topic.into());
512        self
513    }
514
515    /// Sets topics and returns the updated value.
516    pub fn with_topics<I, S>(mut self, topics: I) -> Self
517    where
518        I: IntoIterator<Item = S>,
519        S: Into<String>,
520    {
521        self.topics.extend(topics.into_iter().map(Into::into));
522        self
523    }
524
525    /// Sets auto offset reset and returns the updated value.
526    pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
527        self.config = self.config.with_auto_offset_reset(auto_offset_reset);
528        self
529    }
530
531    /// Sets isolation level and returns the updated value.
532    pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
533        self.config = self.config.with_isolation_level(isolation_level);
534        self
535    }
536
537    /// Sets auto commit and returns the updated value.
538    pub fn with_auto_commit(mut self, enable_auto_commit: bool) -> Self {
539        self.config = self.config.with_enable_auto_commit(enable_auto_commit);
540        self
541    }
542
543    /// Sets request timeout and returns the updated value.
544    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
545        self.config = self.config.with_request_timeout(request_timeout);
546        self
547    }
548
549    /// Sets retry backoff and returns the updated value.
550    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
551        self.config = self.config.with_retry_backoff(retry_backoff);
552        self
553    }
554
555    /// Sets max retries and returns the updated value.
556    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
557        self.config = self.config.with_max_retries(max_retries);
558        self
559    }
560
561    /// Sets instance id and returns the updated value.
562    pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
563        self.config = self.config.with_instance_id(instance_id);
564        self
565    }
566
567    /// Sets rebalance listener and returns the updated value.
568    pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
569        self.config = self.config.with_rebalance_listener(listener);
570        self
571    }
572
573    /// Sets rebalance callback and returns the updated value.
574    pub fn with_rebalance_callback(
575        mut self,
576        callback: impl Fn(ConsumerRebalanceEvent) + Send + Sync + 'static,
577    ) -> Self {
578        self.config = self.config.with_rebalance_callback(callback);
579        self
580    }
581
582    /// Sets TCP connector and returns the updated value.
583    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
584        self.config = self.config.with_tcp_connector(tcp_connector);
585        self
586    }
587
588    /// Sets poll timeout and returns the updated value.
589    pub fn with_poll_timeout(mut self, poll_timeout: Duration) -> Self {
590        self.poll_timeout = poll_timeout;
591        self
592    }
593
594    /// Connects and returns a consumer.
595    pub async fn connect(self) -> Result<KafkaConsumer> {
596        let topics = self
597            .topics
598            .into_iter()
599            .map(validate_topic_name)
600            .collect::<Result<Vec<_>>>()?;
601        let consumer = KafkaConsumer::connect(self.config).await?;
602        if !topics.is_empty()
603            && let Err(error) = consumer.subscribe(topics).await
604        {
605            let _ = consumer.shutdown().await;
606            return Err(error);
607        }
608        Ok(consumer.with_default_poll_timeout(self.poll_timeout))
609    }
610}
611
612fn validate_topic_name(topic: String) -> Result<String> {
613    let topic = topic.trim();
614    if topic.is_empty() {
615        return Err(ValidationError::EmptyTopicName {
616            operation: "topic builder",
617        }
618        .into());
619    }
620    Ok(topic.to_owned())
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626    use crate::SaslMechanism;
627
628    #[test]
629    fn client_builders_preserve_bootstrap_servers_and_topic_defaults() {
630        let client =
631            KafkaClient::new("host-a:9092").with_bootstrap_servers(["host-a:9092", "host-b:9092"]);
632
633        let producer = client
634            .topic("orders")
635            .producer()
636            .with_client_id("producer-a")
637            .with_default_partition(2)
638            .with_compression(ProducerCompression::Lz4)
639            .with_batch_size(32)
640            .with_buffer_memory(4096)
641            .with_max_block(Duration::from_millis(250))
642            .with_max_request_size(2048)
643            .with_partitioner_ignore_keys(true)
644            .with_linger(Duration::from_millis(10))
645            .with_delivery_timeout(Duration::from_secs(3))
646            .with_transactional_id("tx-a");
647
648        assert_eq!(
649            producer.config.bootstrap_servers,
650            vec!["host-a:9092", "host-b:9092"]
651        );
652        assert_eq!(producer.config.client_id, "producer-a");
653        assert_eq!(producer.config.compression, ProducerCompression::Lz4);
654        assert_eq!(producer.config.batch_size, 32);
655        assert_eq!(producer.config.buffer_memory, 4096);
656        assert_eq!(producer.config.max_block, Duration::from_millis(250));
657        assert_eq!(producer.config.max_request_size, 2048);
658        assert!(producer.config.partitioner_ignore_keys);
659        assert_eq!(producer.config.linger, Duration::from_millis(10));
660        assert_eq!(producer.config.delivery_timeout, Duration::from_secs(3));
661        assert_eq!(producer.config.transactional_id.as_deref(), Some("tx-a"));
662        assert_eq!(producer.default_topic.as_deref(), Some("orders"));
663        assert_eq!(producer.default_partition, Some(2));
664    }
665
666    #[test]
667    fn admin_builder_forwards_security_and_timeout_options() {
668        let builder = KafkaClient::new("host-a:9092")
669            .admin()
670            .with_bootstrap_servers(["host-b:9092", "host-c:9092"])
671            .with_client_id("admin-a")
672            .with_request_timeout(Duration::from_secs(9))
673            .with_security_protocol(SecurityProtocol::Ssl)
674            .with_tls(TlsConfig::new().with_server_name("kafka.internal"))
675            .with_sasl_scram_sha_512("user-a", "secret-a");
676
677        assert_eq!(
678            builder.config.bootstrap_servers,
679            vec!["host-b:9092", "host-c:9092"]
680        );
681        assert_eq!(builder.config.client_id, "admin-a");
682        assert_eq!(builder.config.request_timeout, Duration::from_secs(9));
683        assert_eq!(builder.config.security_protocol, SecurityProtocol::SaslSsl);
684        assert_eq!(
685            builder.config.tls.server_name.as_deref(),
686            Some("kafka.internal")
687        );
688        assert_eq!(builder.config.sasl.mechanism, SaslMechanism::ScramSha512);
689    }
690
691    #[test]
692    fn consumer_builder_collects_topics_and_group_options() {
693        let builder = KafkaClient::new("host-a:9092")
694            .topic("orders")
695            .consumer("group-a")
696            .with_bootstrap_servers(["host-b:9092"])
697            .with_client_id("consumer-a")
698            .with_topic("payments")
699            .with_topics(["shipments", "invoices"])
700            .with_auto_offset_reset(AutoOffsetReset::Latest)
701            .with_isolation_level(IsolationLevel::ReadCommitted)
702            .with_auto_commit(true)
703            .with_instance_id("instance-a")
704            .with_poll_timeout(Duration::from_millis(250))
705            .with_sasl_plain("user-a", "secret-a");
706
707        assert_eq!(builder.config.bootstrap_servers, vec!["host-b:9092"]);
708        assert_eq!(builder.config.client_id, "consumer-a");
709        assert_eq!(builder.config.group_id, "group-a");
710        assert_eq!(
711            builder.topics,
712            vec!["orders", "payments", "shipments", "invoices"]
713        );
714        assert_eq!(builder.config.auto_offset_reset, AutoOffsetReset::Latest);
715        assert_eq!(
716            builder.config.isolation_level,
717            IsolationLevel::ReadCommitted
718        );
719        assert!(builder.config.enable_auto_commit);
720        assert_eq!(builder.config.instance_id.as_deref(), Some("instance-a"));
721        assert_eq!(builder.poll_timeout, Duration::from_millis(250));
722        assert_eq!(
723            builder.config.security_protocol,
724            SecurityProtocol::SaslPlaintext
725        );
726    }
727
728    #[test]
729    fn topics_facade_collects_multi_topic_consumer_defaults() {
730        let builder = KafkaClient::new("host-a:9092")
731            .with_bootstrap_servers(["host-a:9092", "host-b:9092"])
732            .topics(["orders", "payments"])
733            .consumer("group-a")
734            .with_topic("shipments");
735
736        assert_eq!(
737            builder.config.bootstrap_servers,
738            vec!["host-a:9092", "host-b:9092"]
739        );
740        assert_eq!(builder.config.group_id, "group-a");
741        assert_eq!(builder.topics, vec!["orders", "payments", "shipments"]);
742    }
743
744    #[test]
745    fn validate_topic_name_trims_and_rejects_empty_names() {
746        assert_eq!(
747            validate_topic_name("  topic-a  ".to_owned()).unwrap(),
748            "topic-a"
749        );
750        assert!(matches!(
751            validate_topic_name("   ".to_owned()),
752            Err(crate::Error::Validation(ValidationError::EmptyTopicName {
753                operation: "topic builder"
754            }))
755        ));
756    }
757}