Skip to main content

kafkit_client/
api.rs

1//! Builder-style entry points for the public client API.
2//!
3//! ```no_run
4//! # async fn example() -> kafkit_client::Result<()> {
5//! use kafkit_client::KafkaClient;
6//!
7//! let client = KafkaClient::new("localhost:9092");
8//! let producer = client.topic("orders").producer().connect().await?;
9//! producer.send_value("created").await?;
10//! producer.shutdown().await?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19
20use crate::admin::KafkaAdmin;
21use crate::consumer::KafkaConsumer;
22use crate::network::TcpConnector;
23use crate::producer::KafkaProducer;
24use crate::{
25    AdminConfig, AutoOffsetReset, ConsumerConfig, IsolationLevel, ProducerCompression,
26    ProducerConfig, Result, SaslConfig, SecurityProtocol, TlsConfig,
27};
28use crate::{ConsumerRebalanceEvent, ConsumerRebalanceListener};
29
30const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
31
32#[derive(Debug, Clone)]
33/// A small builder facade around one or more bootstrap servers.
34pub struct KafkaClient {
35    bootstrap_servers: Vec<String>,
36}
37
38impl KafkaClient {
39    /// Creates a client facade from one bootstrap server.
40    pub fn new(bootstrap_server: impl Into<String>) -> Self {
41        Self {
42            bootstrap_servers: vec![bootstrap_server.into()],
43        }
44    }
45
46    /// Sets bootstrap servers and returns the updated value.
47    pub fn with_bootstrap_servers(
48        mut self,
49        servers: impl IntoIterator<Item = impl Into<String>>,
50    ) -> Self {
51        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
52        self
53    }
54
55    /// Returns a topic-scoped builder facade.
56    pub fn topic(&self, topic: impl Into<String>) -> KafkaTopic {
57        KafkaTopic {
58            bootstrap_servers: self.bootstrap_servers.clone(),
59            topic: topic.into(),
60        }
61    }
62
63    /// Starts building a producer.
64    pub fn producer(&self) -> ProducerBuilder {
65        ProducerBuilder::from_servers(self.bootstrap_servers.clone())
66    }
67
68    /// Starts building an admin client.
69    pub fn admin(&self) -> AdminBuilder {
70        AdminBuilder::from_servers(self.bootstrap_servers.clone())
71    }
72
73    /// Starts building a group consumer.
74    pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
75        ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
76    }
77}
78
79#[derive(Debug, Clone)]
80/// A topic-scoped facade for producers and consumers.
81pub struct KafkaTopic {
82    bootstrap_servers: Vec<String>,
83    topic: String,
84}
85
86impl KafkaTopic {
87    /// Starts building a producer with this topic as the default.
88    pub fn producer(&self) -> ProducerBuilder {
89        ProducerBuilder::from_servers(self.bootstrap_servers.clone())
90            .with_default_topic(self.topic.clone())
91    }
92
93    /// Starts building a consumer already subscribed to this topic.
94    pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
95        ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
96            .with_topic(self.topic.clone())
97    }
98}
99
100#[derive(Debug, Clone)]
101/// Builder for a [`KafkaProducer`].
102pub struct ProducerBuilder {
103    config: ProducerConfig,
104    default_topic: Option<String>,
105    default_partition: Option<i32>,
106}
107
108impl ProducerBuilder {
109    fn new(bootstrap_server: impl Into<String>) -> Self {
110        Self {
111            config: ProducerConfig::new(bootstrap_server),
112            default_topic: None,
113            default_partition: None,
114        }
115    }
116
117    fn from_servers(servers: Vec<String>) -> Self {
118        let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
119        builder.config = builder.config.with_bootstrap_servers(servers);
120        builder
121    }
122
123    /// Sets bootstrap servers and returns the updated value.
124    pub fn with_bootstrap_servers(
125        mut self,
126        servers: impl IntoIterator<Item = impl Into<String>>,
127    ) -> Self {
128        self.config = self.config.with_bootstrap_servers(servers);
129        self
130    }
131
132    /// Sets client id and returns the updated value.
133    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
134        self.config = self.config.with_client_id(client_id);
135        self
136    }
137
138    /// Sets security protocol and returns the updated value.
139    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
140        self.config = self.config.with_security_protocol(security_protocol);
141        self
142    }
143
144    /// Sets tls and returns the updated value.
145    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
146        self.config = self.config.with_tls(tls);
147        self
148    }
149
150    /// Sets sasl and returns the updated value.
151    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
152        self.config = self.config.with_sasl(sasl);
153        self
154    }
155
156    /// Sets sasl plain and returns the updated value.
157    pub fn with_sasl_plain(
158        mut self,
159        username: impl Into<String>,
160        password: impl Into<String>,
161    ) -> Self {
162        self.config = self.config.with_sasl_plain(username, password);
163        self
164    }
165
166    /// Sets sasl scram sha 256 and returns the updated value.
167    pub fn with_sasl_scram_sha_256(
168        mut self,
169        username: impl Into<String>,
170        password: impl Into<String>,
171    ) -> Self {
172        self.config = self.config.with_sasl_scram_sha_256(username, password);
173        self
174    }
175
176    /// Sets sasl scram sha 512 and returns the updated value.
177    pub fn with_sasl_scram_sha_512(
178        mut self,
179        username: impl Into<String>,
180        password: impl Into<String>,
181    ) -> Self {
182        self.config = self.config.with_sasl_scram_sha_512(username, password);
183        self
184    }
185
186    /// Sets compression and returns the updated value.
187    pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
188        self.config = self.config.with_compression(compression);
189        self
190    }
191
192    /// Sets default topic and returns the updated value.
193    pub fn with_default_topic(mut self, topic: impl Into<String>) -> Self {
194        self.default_topic = Some(topic.into());
195        self
196    }
197
198    /// Sets default partition and returns the updated value.
199    pub fn with_default_partition(mut self, partition: i32) -> Self {
200        self.default_partition = Some(partition);
201        self
202    }
203
204    /// Sets enable idempotence and returns the updated value.
205    pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
206        self.config = self.config.with_enable_idempotence(enable_idempotence);
207        self
208    }
209
210    /// Sets batch size and returns the updated value.
211    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
212        self.config = self.config.with_batch_size(batch_size);
213        self
214    }
215
216    /// Sets linger and returns the updated value.
217    pub fn with_linger(mut self, linger: Duration) -> Self {
218        self.config = self.config.with_linger(linger);
219        self
220    }
221
222    /// Sets delivery timeout and returns the updated value.
223    pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
224        self.config = self.config.with_delivery_timeout(delivery_timeout);
225        self
226    }
227
228    /// Sets request timeout and returns the updated value.
229    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
230        self.config = self.config.with_request_timeout(request_timeout);
231        self
232    }
233
234    /// Sets retry backoff and returns the updated value.
235    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
236        self.config = self.config.with_retry_backoff(retry_backoff);
237        self
238    }
239
240    /// Sets max retries and returns the updated value.
241    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
242        self.config = self.config.with_max_retries(max_retries);
243        self
244    }
245
246    /// Sets max in-flight requests per broker connection and returns the updated value.
247    pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
248        self.config = self
249            .config
250            .with_max_in_flight_requests_per_connection(max_in_flight);
251        self
252    }
253
254    /// Sets transactional id and returns the updated value.
255    pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
256        self.config = self.config.with_transactional_id(transactional_id);
257        self
258    }
259
260    /// Sets TCP connector and returns the updated value.
261    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
262        self.config = self.config.with_tcp_connector(tcp_connector);
263        self
264    }
265
266    /// Connects and returns a producer.
267    pub async fn connect(self) -> Result<KafkaProducer> {
268        let producer = KafkaProducer::connect(self.config).await?;
269        Ok(producer.with_defaults(self.default_topic, self.default_partition))
270    }
271}
272
273#[derive(Debug, Clone)]
274/// Builder for a [`KafkaAdmin`].
275pub struct AdminBuilder {
276    config: AdminConfig,
277}
278
279impl AdminBuilder {
280    fn new(bootstrap_server: impl Into<String>) -> Self {
281        Self {
282            config: AdminConfig::new(bootstrap_server),
283        }
284    }
285
286    fn from_servers(servers: Vec<String>) -> Self {
287        let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
288        builder.config = builder.config.with_bootstrap_servers(servers);
289        builder
290    }
291
292    /// Sets bootstrap servers and returns the updated value.
293    pub fn with_bootstrap_servers(
294        mut self,
295        servers: impl IntoIterator<Item = impl Into<String>>,
296    ) -> Self {
297        self.config = self.config.with_bootstrap_servers(servers);
298        self
299    }
300
301    /// Sets client id and returns the updated value.
302    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
303        self.config = self.config.with_client_id(client_id);
304        self
305    }
306
307    /// Sets request timeout and returns the updated value.
308    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
309        self.config = self.config.with_request_timeout(request_timeout);
310        self
311    }
312
313    /// Sets security protocol and returns the updated value.
314    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
315        self.config = self.config.with_security_protocol(security_protocol);
316        self
317    }
318
319    /// Sets tls and returns the updated value.
320    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
321        self.config = self.config.with_tls(tls);
322        self
323    }
324
325    /// Sets sasl and returns the updated value.
326    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
327        self.config = self.config.with_sasl(sasl);
328        self
329    }
330
331    /// Sets sasl plain and returns the updated value.
332    pub fn with_sasl_plain(
333        mut self,
334        username: impl Into<String>,
335        password: impl Into<String>,
336    ) -> Self {
337        self.config = self.config.with_sasl_plain(username, password);
338        self
339    }
340
341    /// Sets sasl scram sha 256 and returns the updated value.
342    pub fn with_sasl_scram_sha_256(
343        mut self,
344        username: impl Into<String>,
345        password: impl Into<String>,
346    ) -> Self {
347        self.config = self.config.with_sasl_scram_sha_256(username, password);
348        self
349    }
350
351    /// Sets sasl scram sha 512 and returns the updated value.
352    pub fn with_sasl_scram_sha_512(
353        mut self,
354        username: impl Into<String>,
355        password: impl Into<String>,
356    ) -> Self {
357        self.config = self.config.with_sasl_scram_sha_512(username, password);
358        self
359    }
360
361    /// Sets TCP connector and returns the updated value.
362    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
363        self.config = self.config.with_tcp_connector(tcp_connector);
364        self
365    }
366
367    /// Connects and returns an admin client.
368    pub async fn connect(self) -> Result<KafkaAdmin> {
369        KafkaAdmin::connect(self.config).await
370    }
371}
372
373#[derive(Debug, Clone)]
374/// Builder for a [`KafkaConsumer`].
375pub struct ConsumerBuilder {
376    config: ConsumerConfig,
377    topics: Vec<String>,
378    poll_timeout: Duration,
379}
380
381impl ConsumerBuilder {
382    fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
383        Self {
384            config: ConsumerConfig::new(bootstrap_server, group_id),
385            topics: Vec::new(),
386            poll_timeout: DEFAULT_POLL_TIMEOUT,
387        }
388    }
389
390    fn from_servers(servers: Vec<String>, group_id: impl Into<String>) -> Self {
391        let first = servers.first().cloned().unwrap_or_default();
392        let mut builder = Self::new(first, group_id);
393        builder.config = builder.config.with_bootstrap_servers(servers);
394        builder
395    }
396
397    /// Sets bootstrap servers and returns the updated value.
398    pub fn with_bootstrap_servers(
399        mut self,
400        servers: impl IntoIterator<Item = impl Into<String>>,
401    ) -> Self {
402        self.config = self.config.with_bootstrap_servers(servers);
403        self
404    }
405
406    /// Sets client id and returns the updated value.
407    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
408        self.config = self.config.with_client_id(client_id);
409        self
410    }
411
412    /// Sets security protocol and returns the updated value.
413    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
414        self.config = self.config.with_security_protocol(security_protocol);
415        self
416    }
417
418    /// Sets tls and returns the updated value.
419    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
420        self.config = self.config.with_tls(tls);
421        self
422    }
423
424    /// Sets sasl and returns the updated value.
425    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
426        self.config = self.config.with_sasl(sasl);
427        self
428    }
429
430    /// Sets sasl plain and returns the updated value.
431    pub fn with_sasl_plain(
432        mut self,
433        username: impl Into<String>,
434        password: impl Into<String>,
435    ) -> Self {
436        self.config = self.config.with_sasl_plain(username, password);
437        self
438    }
439
440    /// Sets sasl scram sha 256 and returns the updated value.
441    pub fn with_sasl_scram_sha_256(
442        mut self,
443        username: impl Into<String>,
444        password: impl Into<String>,
445    ) -> Self {
446        self.config = self.config.with_sasl_scram_sha_256(username, password);
447        self
448    }
449
450    /// Sets sasl scram sha 512 and returns the updated value.
451    pub fn with_sasl_scram_sha_512(
452        mut self,
453        username: impl Into<String>,
454        password: impl Into<String>,
455    ) -> Self {
456        self.config = self.config.with_sasl_scram_sha_512(username, password);
457        self
458    }
459
460    /// Sets topic and returns the updated value.
461    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
462        self.topics.push(topic.into());
463        self
464    }
465
466    /// Sets topics and returns the updated value.
467    pub fn with_topics<I, S>(mut self, topics: I) -> Self
468    where
469        I: IntoIterator<Item = S>,
470        S: Into<String>,
471    {
472        self.topics.extend(topics.into_iter().map(Into::into));
473        self
474    }
475
476    /// Sets auto offset reset and returns the updated value.
477    pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
478        self.config = self.config.with_auto_offset_reset(auto_offset_reset);
479        self
480    }
481
482    /// Sets isolation level and returns the updated value.
483    pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
484        self.config = self.config.with_isolation_level(isolation_level);
485        self
486    }
487
488    /// Sets auto commit and returns the updated value.
489    pub fn with_auto_commit(mut self, enable_auto_commit: bool) -> Self {
490        self.config = self.config.with_enable_auto_commit(enable_auto_commit);
491        self
492    }
493
494    /// Sets request timeout and returns the updated value.
495    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
496        self.config = self.config.with_request_timeout(request_timeout);
497        self
498    }
499
500    /// Sets retry backoff and returns the updated value.
501    pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
502        self.config = self.config.with_retry_backoff(retry_backoff);
503        self
504    }
505
506    /// Sets max retries and returns the updated value.
507    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
508        self.config = self.config.with_max_retries(max_retries);
509        self
510    }
511
512    /// Sets instance id and returns the updated value.
513    pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
514        self.config = self.config.with_instance_id(instance_id);
515        self
516    }
517
518    /// Sets rebalance listener and returns the updated value.
519    pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
520        self.config = self.config.with_rebalance_listener(listener);
521        self
522    }
523
524    /// Sets rebalance callback and returns the updated value.
525    pub fn with_rebalance_callback(
526        mut self,
527        callback: impl Fn(ConsumerRebalanceEvent) + Send + Sync + 'static,
528    ) -> Self {
529        self.config = self.config.with_rebalance_callback(callback);
530        self
531    }
532
533    /// Sets TCP connector and returns the updated value.
534    pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
535        self.config = self.config.with_tcp_connector(tcp_connector);
536        self
537    }
538
539    /// Sets poll timeout and returns the updated value.
540    pub fn with_poll_timeout(mut self, poll_timeout: Duration) -> Self {
541        self.poll_timeout = poll_timeout;
542        self
543    }
544
545    /// Connects and returns a consumer.
546    pub async fn connect(self) -> Result<KafkaConsumer> {
547        let topics = self
548            .topics
549            .into_iter()
550            .map(validate_topic_name)
551            .collect::<Result<Vec<_>>>()?;
552        let consumer = KafkaConsumer::connect(self.config).await?;
553        if !topics.is_empty()
554            && let Err(error) = consumer.subscribe(topics).await
555        {
556            let _ = consumer.shutdown().await;
557            return Err(error);
558        }
559        Ok(consumer.with_default_poll_timeout(self.poll_timeout))
560    }
561}
562
563fn validate_topic_name(topic: String) -> Result<String> {
564    let topic = topic.trim();
565    if topic.is_empty() {
566        return Err(anyhow!("topic must be non-empty").into());
567    }
568    Ok(topic.to_owned())
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use crate::SaslMechanism;
575
576    #[test]
577    fn client_builders_preserve_bootstrap_servers_and_topic_defaults() {
578        let client =
579            KafkaClient::new("host-a:9092").with_bootstrap_servers(["host-a:9092", "host-b:9092"]);
580
581        let producer = client
582            .topic("orders")
583            .producer()
584            .with_client_id("producer-a")
585            .with_default_partition(2)
586            .with_compression(ProducerCompression::Lz4)
587            .with_batch_size(32)
588            .with_linger(Duration::from_millis(10))
589            .with_delivery_timeout(Duration::from_secs(3))
590            .with_transactional_id("tx-a");
591
592        assert_eq!(
593            producer.config.bootstrap_servers,
594            vec!["host-a:9092", "host-b:9092"]
595        );
596        assert_eq!(producer.config.client_id, "producer-a");
597        assert_eq!(producer.config.compression, ProducerCompression::Lz4);
598        assert_eq!(producer.config.batch_size, 32);
599        assert_eq!(producer.config.linger, Duration::from_millis(10));
600        assert_eq!(producer.config.delivery_timeout, Duration::from_secs(3));
601        assert_eq!(producer.config.transactional_id.as_deref(), Some("tx-a"));
602        assert_eq!(producer.default_topic.as_deref(), Some("orders"));
603        assert_eq!(producer.default_partition, Some(2));
604    }
605
606    #[test]
607    fn admin_builder_forwards_security_and_timeout_options() {
608        let builder = KafkaClient::new("host-a:9092")
609            .admin()
610            .with_bootstrap_servers(["host-b:9092", "host-c:9092"])
611            .with_client_id("admin-a")
612            .with_request_timeout(Duration::from_secs(9))
613            .with_security_protocol(SecurityProtocol::Ssl)
614            .with_tls(TlsConfig::new().with_server_name("kafka.internal"))
615            .with_sasl_scram_sha_512("user-a", "secret-a");
616
617        assert_eq!(
618            builder.config.bootstrap_servers,
619            vec!["host-b:9092", "host-c:9092"]
620        );
621        assert_eq!(builder.config.client_id, "admin-a");
622        assert_eq!(builder.config.request_timeout, Duration::from_secs(9));
623        assert_eq!(builder.config.security_protocol, SecurityProtocol::SaslSsl);
624        assert_eq!(
625            builder.config.tls.server_name.as_deref(),
626            Some("kafka.internal")
627        );
628        assert_eq!(builder.config.sasl.mechanism, SaslMechanism::ScramSha512);
629    }
630
631    #[test]
632    fn consumer_builder_collects_topics_and_group_options() {
633        let builder = KafkaClient::new("host-a:9092")
634            .topic("orders")
635            .consumer("group-a")
636            .with_bootstrap_servers(["host-b:9092"])
637            .with_client_id("consumer-a")
638            .with_topic("payments")
639            .with_topics(["shipments", "invoices"])
640            .with_auto_offset_reset(AutoOffsetReset::Latest)
641            .with_isolation_level(IsolationLevel::ReadCommitted)
642            .with_auto_commit(true)
643            .with_instance_id("instance-a")
644            .with_poll_timeout(Duration::from_millis(250))
645            .with_sasl_plain("user-a", "secret-a");
646
647        assert_eq!(builder.config.bootstrap_servers, vec!["host-b:9092"]);
648        assert_eq!(builder.config.client_id, "consumer-a");
649        assert_eq!(builder.config.group_id, "group-a");
650        assert_eq!(
651            builder.topics,
652            vec!["orders", "payments", "shipments", "invoices"]
653        );
654        assert_eq!(builder.config.auto_offset_reset, AutoOffsetReset::Latest);
655        assert_eq!(
656            builder.config.isolation_level,
657            IsolationLevel::ReadCommitted
658        );
659        assert!(builder.config.enable_auto_commit);
660        assert_eq!(builder.config.instance_id.as_deref(), Some("instance-a"));
661        assert_eq!(builder.poll_timeout, Duration::from_millis(250));
662        assert_eq!(
663            builder.config.security_protocol,
664            SecurityProtocol::SaslPlaintext
665        );
666    }
667
668    #[test]
669    fn validate_topic_name_trims_and_rejects_empty_names() {
670        assert_eq!(
671            validate_topic_name("  topic-a  ".to_owned()).unwrap(),
672            "topic-a"
673        );
674        assert!(validate_topic_name("   ".to_owned()).is_err());
675    }
676}