Skip to main content

kafkit_client/
api.rs

1//! Builder-style entry points for the public client API.
2//!
3//! ```no_run
4//! # async fn example() -> kafkit_client::Result<()> {
5//! use kafkit_client::KafkaClient;
6//!
7//! let client = KafkaClient::new("localhost:9092");
8//! let producer = client.topic("orders").producer().connect().await?;
9//! producer.send_value("created").await?;
10//! producer.shutdown().await?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19
20use crate::admin::KafkaAdmin;
21use crate::consumer::KafkaConsumer;
22use crate::network::TcpConnector;
23use crate::producer::KafkaProducer;
24use crate::{
25    AdminConfig, AutoOffsetReset, ConsumerConfig, IsolationLevel, ProducerCompression,
26    ProducerConfig, Result, SaslConfig, SecurityProtocol, TlsConfig,
27};
28use crate::{ConsumerRebalanceEvent, ConsumerRebalanceListener};
29
30const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
31
32#[derive(Debug, Clone)]
33/// A small builder facade around one or more bootstrap servers.
34pub struct KafkaClient {
35    bootstrap_servers: Vec<String>,
36}
37
38impl KafkaClient {
39    /// Creates a client facade from one bootstrap server.
40    pub fn new(bootstrap_server: impl Into<String>) -> Self {
41        Self {
42            bootstrap_servers: vec![bootstrap_server.into()],
43        }
44    }
45
46    /// Sets bootstrap servers and returns the updated value.
47    pub fn with_bootstrap_servers(
48        mut self,
49        servers: impl IntoIterator<Item = impl Into<String>>,
50    ) -> Self {
51        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
52        self
53    }
54
55    /// Returns a topic-scoped builder facade.
56    pub fn topic(&self, topic: impl Into<String>) -> KafkaTopic {
57        KafkaTopic {
58            bootstrap_servers: self.bootstrap_servers.clone(),
59            topic: topic.into(),
60        }
61    }
62
63    /// Returns a multi-topic builder facade.
64    pub fn topics<I, S>(&self, topics: I) -> KafkaTopics
65    where
66        I: IntoIterator<Item = S>,
67        S: Into<String>,
68    {
69        KafkaTopics {
70            bootstrap_servers: self.bootstrap_servers.clone(),
71            topics: topics.into_iter().map(Into::into).collect(),
72        }
73    }
74
75    /// Starts building a producer.
76    pub fn producer(&self) -> ProducerBuilder {
77        ProducerBuilder::from_servers(self.bootstrap_servers.clone())
78    }
79
80    /// Starts building an admin client.
81    pub fn admin(&self) -> AdminBuilder {
82        AdminBuilder::from_servers(self.bootstrap_servers.clone())
83    }
84
85    /// Starts building a group consumer.
86    pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
87        ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
88    }
89}
90
91#[derive(Debug, Clone)]
92/// A topic-scoped facade for producers and consumers.
93pub struct KafkaTopic {
94    bootstrap_servers: Vec<String>,
95    topic: String,
96}
97
98impl KafkaTopic {
99    /// Starts building a producer with this topic as the default.
100    pub fn producer(&self) -> ProducerBuilder {
101        ProducerBuilder::from_servers(self.bootstrap_servers.clone())
102            .with_default_topic(self.topic.clone())
103    }
104
105    /// Starts building a consumer already subscribed to this topic.
106    pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
107        ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
108            .with_topic(self.topic.clone())
109    }
110}
111
112#[derive(Debug, Clone)]
113/// A multi-topic facade for consumers.
114pub struct KafkaTopics {
115    bootstrap_servers: Vec<String>,
116    topics: Vec<String>,
117}
118
119impl KafkaTopics {
120    /// Starts building a consumer already subscribed to these topics.
121    pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
122        ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
123            .with_topics(self.topics.clone())
124    }
125}
126
127#[derive(Debug, Clone)]
128/// Builder for a [`KafkaProducer`].
129pub struct ProducerBuilder {
130    config: ProducerConfig,
131    default_topic: Option<String>,
132    default_partition: Option<i32>,
133}
134
135impl ProducerBuilder {
136    fn new(bootstrap_server: impl Into<String>) -> Self {
137        Self {
138            config: ProducerConfig::new(bootstrap_server),
139            default_topic: None,
140            default_partition: None,
141        }
142    }
143
144    fn from_servers(servers: Vec<String>) -> Self {
145        let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
146        builder.config = builder.config.with_bootstrap_servers(servers);
147        builder
148    }
149
150    /// Sets bootstrap servers and returns the updated value.
151    pub fn with_bootstrap_servers(
152        mut self,
153        servers: impl IntoIterator<Item = impl Into<String>>,
154    ) -> Self {
155        self.config = self.config.with_bootstrap_servers(servers);
156        self
157    }
158
159    /// Sets client id and returns the updated value.
160    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
161        self.config = self.config.with_client_id(client_id);
162        self
163    }
164
165    /// Sets security protocol and returns the updated value.
166    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
167        self.config = self.config.with_security_protocol(security_protocol);
168        self
169    }
170
171    /// Sets tls and returns the updated value.
172    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
173        self.config = self.config.with_tls(tls);
174        self
175    }
176
177    /// Sets sasl and returns the updated value.
178    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
179        self.config = self.config.with_sasl(sasl);
180        self
181    }
182
183    /// Sets sasl plain and returns the updated value.
184    pub fn with_sasl_plain(
185        mut self,
186        username: impl Into<String>,
187        password: impl Into<String>,
188    ) -> Self {
189        self.config = self.config.with_sasl_plain(username, password);
190        self
191    }
192
193    /// Sets sasl scram sha 256 and returns the updated value.
194    pub fn with_sasl_scram_sha_256(
195        mut self,
196        username: impl Into<String>,
197        password: impl Into<String>,
198    ) -> Self {
199        self.config = self.config.with_sasl_scram_sha_256(username, password);
200        self
201    }
202
203    /// Sets sasl scram sha 512 and returns the updated value.
204    pub fn with_sasl_scram_sha_512(
205        mut self,
206        username: impl Into<String>,
207        password: impl Into<String>,
208    ) -> Self {
209        self.config = self.config.with_sasl_scram_sha_512(username, password);
210        self
211    }
212
213    /// Sets compression and returns the updated value.
214    pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
215        self.config = self.config.with_compression(compression);
216        self
217    }
218
219    /// Sets default topic and returns the updated value.
220    pub fn with_default_topic(mut self, topic: impl Into<String>) -> Self {
221        self.default_topic = Some(topic.into());
222        self
223    }
224
225    /// Sets default partition and returns the updated value.
226    pub fn with_default_partition(mut self, partition: i32) -> Self {
227        self.default_partition = Some(partition);
228        self
229    }
230
231    /// Sets enable idempotence and returns the updated value.
232    pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
233        self.config = self.config.with_enable_idempotence(enable_idempotence);
234        self
235    }
236
237    /// Sets batch size and returns the updated value.
238    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
239        self.config = self.config.with_batch_size(batch_size);
240        self
241    }
242
243    /// Sets linger and returns the updated value.
244    pub fn with_linger(mut self, linger: Duration) -> Self {
245        self.config = self.config.with_linger(linger);
246        self
247    }
248
249    /// Sets delivery timeout and returns the updated value.
250    pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
251        self.config = self.config.with_delivery_timeout(delivery_timeout);
252        self
253    }
254
255    /// Sets request timeout and returns the updated value.
256    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
257        self.config = self.config.with_request_timeout(request_timeout);
258        self
259    }
260
261    /// Sets retry backoff and returns the updated value.
262    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
263        self.config = self.config.with_retry_backoff(retry_backoff);
264        self
265    }
266
267    /// Sets max retries and returns the updated value.
268    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
269        self.config = self.config.with_max_retries(max_retries);
270        self
271    }
272
273    /// Sets max in-flight requests per broker connection and returns the updated value.
274    pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
275        self.config = self
276            .config
277            .with_max_in_flight_requests_per_connection(max_in_flight);
278        self
279    }
280
281    /// Sets transactional id and returns the updated value.
282    pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
283        self.config = self.config.with_transactional_id(transactional_id);
284        self
285    }
286
287    /// Sets TCP connector and returns the updated value.
288    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
289        self.config = self.config.with_tcp_connector(tcp_connector);
290        self
291    }
292
293    /// Connects and returns a producer.
294    pub async fn connect(self) -> Result<KafkaProducer> {
295        let producer = KafkaProducer::connect(self.config).await?;
296        Ok(producer.with_defaults(self.default_topic, self.default_partition))
297    }
298}
299
300#[derive(Debug, Clone)]
301/// Builder for a [`KafkaAdmin`].
302pub struct AdminBuilder {
303    config: AdminConfig,
304}
305
306impl AdminBuilder {
307    fn new(bootstrap_server: impl Into<String>) -> Self {
308        Self {
309            config: AdminConfig::new(bootstrap_server),
310        }
311    }
312
313    fn from_servers(servers: Vec<String>) -> Self {
314        let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
315        builder.config = builder.config.with_bootstrap_servers(servers);
316        builder
317    }
318
319    /// Sets bootstrap servers and returns the updated value.
320    pub fn with_bootstrap_servers(
321        mut self,
322        servers: impl IntoIterator<Item = impl Into<String>>,
323    ) -> Self {
324        self.config = self.config.with_bootstrap_servers(servers);
325        self
326    }
327
328    /// Sets client id and returns the updated value.
329    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
330        self.config = self.config.with_client_id(client_id);
331        self
332    }
333
334    /// Sets request timeout and returns the updated value.
335    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
336        self.config = self.config.with_request_timeout(request_timeout);
337        self
338    }
339
340    /// Sets security protocol and returns the updated value.
341    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
342        self.config = self.config.with_security_protocol(security_protocol);
343        self
344    }
345
346    /// Sets tls and returns the updated value.
347    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
348        self.config = self.config.with_tls(tls);
349        self
350    }
351
352    /// Sets sasl and returns the updated value.
353    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
354        self.config = self.config.with_sasl(sasl);
355        self
356    }
357
358    /// Sets sasl plain and returns the updated value.
359    pub fn with_sasl_plain(
360        mut self,
361        username: impl Into<String>,
362        password: impl Into<String>,
363    ) -> Self {
364        self.config = self.config.with_sasl_plain(username, password);
365        self
366    }
367
368    /// Sets sasl scram sha 256 and returns the updated value.
369    pub fn with_sasl_scram_sha_256(
370        mut self,
371        username: impl Into<String>,
372        password: impl Into<String>,
373    ) -> Self {
374        self.config = self.config.with_sasl_scram_sha_256(username, password);
375        self
376    }
377
378    /// Sets sasl scram sha 512 and returns the updated value.
379    pub fn with_sasl_scram_sha_512(
380        mut self,
381        username: impl Into<String>,
382        password: impl Into<String>,
383    ) -> Self {
384        self.config = self.config.with_sasl_scram_sha_512(username, password);
385        self
386    }
387
388    /// Sets TCP connector and returns the updated value.
389    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
390        self.config = self.config.with_tcp_connector(tcp_connector);
391        self
392    }
393
394    /// Connects and returns an admin client.
395    pub async fn connect(self) -> Result<KafkaAdmin> {
396        KafkaAdmin::connect(self.config).await
397    }
398}
399
400#[derive(Debug, Clone)]
401/// Builder for a [`KafkaConsumer`].
402pub struct ConsumerBuilder {
403    config: ConsumerConfig,
404    topics: Vec<String>,
405    poll_timeout: Duration,
406}
407
408impl ConsumerBuilder {
409    fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
410        Self {
411            config: ConsumerConfig::new(bootstrap_server, group_id),
412            topics: Vec::new(),
413            poll_timeout: DEFAULT_POLL_TIMEOUT,
414        }
415    }
416
417    fn from_servers(servers: Vec<String>, group_id: impl Into<String>) -> Self {
418        let first = servers.first().cloned().unwrap_or_default();
419        let mut builder = Self::new(first, group_id);
420        builder.config = builder.config.with_bootstrap_servers(servers);
421        builder
422    }
423
424    /// Sets bootstrap servers and returns the updated value.
425    pub fn with_bootstrap_servers(
426        mut self,
427        servers: impl IntoIterator<Item = impl Into<String>>,
428    ) -> Self {
429        self.config = self.config.with_bootstrap_servers(servers);
430        self
431    }
432
433    /// Sets client id and returns the updated value.
434    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
435        self.config = self.config.with_client_id(client_id);
436        self
437    }
438
439    /// Sets security protocol and returns the updated value.
440    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
441        self.config = self.config.with_security_protocol(security_protocol);
442        self
443    }
444
445    /// Sets tls and returns the updated value.
446    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
447        self.config = self.config.with_tls(tls);
448        self
449    }
450
451    /// Sets sasl and returns the updated value.
452    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
453        self.config = self.config.with_sasl(sasl);
454        self
455    }
456
457    /// Sets sasl plain and returns the updated value.
458    pub fn with_sasl_plain(
459        mut self,
460        username: impl Into<String>,
461        password: impl Into<String>,
462    ) -> Self {
463        self.config = self.config.with_sasl_plain(username, password);
464        self
465    }
466
467    /// Sets sasl scram sha 256 and returns the updated value.
468    pub fn with_sasl_scram_sha_256(
469        mut self,
470        username: impl Into<String>,
471        password: impl Into<String>,
472    ) -> Self {
473        self.config = self.config.with_sasl_scram_sha_256(username, password);
474        self
475    }
476
477    /// Sets sasl scram sha 512 and returns the updated value.
478    pub fn with_sasl_scram_sha_512(
479        mut self,
480        username: impl Into<String>,
481        password: impl Into<String>,
482    ) -> Self {
483        self.config = self.config.with_sasl_scram_sha_512(username, password);
484        self
485    }
486
487    /// Sets topic and returns the updated value.
488    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
489        self.topics.push(topic.into());
490        self
491    }
492
493    /// Sets topics and returns the updated value.
494    pub fn with_topics<I, S>(mut self, topics: I) -> Self
495    where
496        I: IntoIterator<Item = S>,
497        S: Into<String>,
498    {
499        self.topics.extend(topics.into_iter().map(Into::into));
500        self
501    }
502
503    /// Sets auto offset reset and returns the updated value.
504    pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
505        self.config = self.config.with_auto_offset_reset(auto_offset_reset);
506        self
507    }
508
509    /// Sets isolation level and returns the updated value.
510    pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
511        self.config = self.config.with_isolation_level(isolation_level);
512        self
513    }
514
515    /// Sets auto commit and returns the updated value.
516    pub fn with_auto_commit(mut self, enable_auto_commit: bool) -> Self {
517        self.config = self.config.with_enable_auto_commit(enable_auto_commit);
518        self
519    }
520
521    /// Sets request timeout and returns the updated value.
522    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
523        self.config = self.config.with_request_timeout(request_timeout);
524        self
525    }
526
527    /// Sets retry backoff and returns the updated value.
528    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
529        self.config = self.config.with_retry_backoff(retry_backoff);
530        self
531    }
532
533    /// Sets max retries and returns the updated value.
534    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
535        self.config = self.config.with_max_retries(max_retries);
536        self
537    }
538
539    /// Sets instance id and returns the updated value.
540    pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
541        self.config = self.config.with_instance_id(instance_id);
542        self
543    }
544
545    /// Sets rebalance listener and returns the updated value.
546    pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
547        self.config = self.config.with_rebalance_listener(listener);
548        self
549    }
550
551    /// Sets rebalance callback and returns the updated value.
552    pub fn with_rebalance_callback(
553        mut self,
554        callback: impl Fn(ConsumerRebalanceEvent) + Send + Sync + 'static,
555    ) -> Self {
556        self.config = self.config.with_rebalance_callback(callback);
557        self
558    }
559
560    /// Sets TCP connector and returns the updated value.
561    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
562        self.config = self.config.with_tcp_connector(tcp_connector);
563        self
564    }
565
566    /// Sets poll timeout and returns the updated value.
567    pub fn with_poll_timeout(mut self, poll_timeout: Duration) -> Self {
568        self.poll_timeout = poll_timeout;
569        self
570    }
571
572    /// Connects and returns a consumer.
573    pub async fn connect(self) -> Result<KafkaConsumer> {
574        let topics = self
575            .topics
576            .into_iter()
577            .map(validate_topic_name)
578            .collect::<Result<Vec<_>>>()?;
579        let consumer = KafkaConsumer::connect(self.config).await?;
580        if !topics.is_empty()
581            && let Err(error) = consumer.subscribe(topics).await
582        {
583            let _ = consumer.shutdown().await;
584            return Err(error);
585        }
586        Ok(consumer.with_default_poll_timeout(self.poll_timeout))
587    }
588}
589
590fn validate_topic_name(topic: String) -> Result<String> {
591    let topic = topic.trim();
592    if topic.is_empty() {
593        return Err(anyhow!("topic must be non-empty").into());
594    }
595    Ok(topic.to_owned())
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use crate::SaslMechanism;
602
603    #[test]
604    fn client_builders_preserve_bootstrap_servers_and_topic_defaults() {
605        let client =
606            KafkaClient::new("host-a:9092").with_bootstrap_servers(["host-a:9092", "host-b:9092"]);
607
608        let producer = client
609            .topic("orders")
610            .producer()
611            .with_client_id("producer-a")
612            .with_default_partition(2)
613            .with_compression(ProducerCompression::Lz4)
614            .with_batch_size(32)
615            .with_linger(Duration::from_millis(10))
616            .with_delivery_timeout(Duration::from_secs(3))
617            .with_transactional_id("tx-a");
618
619        assert_eq!(
620            producer.config.bootstrap_servers,
621            vec!["host-a:9092", "host-b:9092"]
622        );
623        assert_eq!(producer.config.client_id, "producer-a");
624        assert_eq!(producer.config.compression, ProducerCompression::Lz4);
625        assert_eq!(producer.config.batch_size, 32);
626        assert_eq!(producer.config.linger, Duration::from_millis(10));
627        assert_eq!(producer.config.delivery_timeout, Duration::from_secs(3));
628        assert_eq!(producer.config.transactional_id.as_deref(), Some("tx-a"));
629        assert_eq!(producer.default_topic.as_deref(), Some("orders"));
630        assert_eq!(producer.default_partition, Some(2));
631    }
632
633    #[test]
634    fn admin_builder_forwards_security_and_timeout_options() {
635        let builder = KafkaClient::new("host-a:9092")
636            .admin()
637            .with_bootstrap_servers(["host-b:9092", "host-c:9092"])
638            .with_client_id("admin-a")
639            .with_request_timeout(Duration::from_secs(9))
640            .with_security_protocol(SecurityProtocol::Ssl)
641            .with_tls(TlsConfig::new().with_server_name("kafka.internal"))
642            .with_sasl_scram_sha_512("user-a", "secret-a");
643
644        assert_eq!(
645            builder.config.bootstrap_servers,
646            vec!["host-b:9092", "host-c:9092"]
647        );
648        assert_eq!(builder.config.client_id, "admin-a");
649        assert_eq!(builder.config.request_timeout, Duration::from_secs(9));
650        assert_eq!(builder.config.security_protocol, SecurityProtocol::SaslSsl);
651        assert_eq!(
652            builder.config.tls.server_name.as_deref(),
653            Some("kafka.internal")
654        );
655        assert_eq!(builder.config.sasl.mechanism, SaslMechanism::ScramSha512);
656    }
657
658    #[test]
659    fn consumer_builder_collects_topics_and_group_options() {
660        let builder = KafkaClient::new("host-a:9092")
661            .topic("orders")
662            .consumer("group-a")
663            .with_bootstrap_servers(["host-b:9092"])
664            .with_client_id("consumer-a")
665            .with_topic("payments")
666            .with_topics(["shipments", "invoices"])
667            .with_auto_offset_reset(AutoOffsetReset::Latest)
668            .with_isolation_level(IsolationLevel::ReadCommitted)
669            .with_auto_commit(true)
670            .with_instance_id("instance-a")
671            .with_poll_timeout(Duration::from_millis(250))
672            .with_sasl_plain("user-a", "secret-a");
673
674        assert_eq!(builder.config.bootstrap_servers, vec!["host-b:9092"]);
675        assert_eq!(builder.config.client_id, "consumer-a");
676        assert_eq!(builder.config.group_id, "group-a");
677        assert_eq!(
678            builder.topics,
679            vec!["orders", "payments", "shipments", "invoices"]
680        );
681        assert_eq!(builder.config.auto_offset_reset, AutoOffsetReset::Latest);
682        assert_eq!(
683            builder.config.isolation_level,
684            IsolationLevel::ReadCommitted
685        );
686        assert!(builder.config.enable_auto_commit);
687        assert_eq!(builder.config.instance_id.as_deref(), Some("instance-a"));
688        assert_eq!(builder.poll_timeout, Duration::from_millis(250));
689        assert_eq!(
690            builder.config.security_protocol,
691            SecurityProtocol::SaslPlaintext
692        );
693    }
694
695    #[test]
696    fn topics_facade_collects_multi_topic_consumer_defaults() {
697        let builder = KafkaClient::new("host-a:9092")
698            .with_bootstrap_servers(["host-a:9092", "host-b:9092"])
699            .topics(["orders", "payments"])
700            .consumer("group-a")
701            .with_topic("shipments");
702
703        assert_eq!(
704            builder.config.bootstrap_servers,
705            vec!["host-a:9092", "host-b:9092"]
706        );
707        assert_eq!(builder.config.group_id, "group-a");
708        assert_eq!(builder.topics, vec!["orders", "payments", "shipments"]);
709    }
710
711    #[test]
712    fn validate_topic_name_trims_and_rejects_empty_names() {
713        assert_eq!(
714            validate_topic_name("  topic-a  ".to_owned()).unwrap(),
715            "topic-a"
716        );
717        assert!(validate_topic_name("   ".to_owned()).is_err());
718    }
719}