Skip to main content

kafkit_client/
api.rs

1//! Builder-style entry points for the public client API.
2//!
3//! ```no_run
4//! # async fn example() -> kafkit_client::Result<()> {
5//! use kafkit_client::KafkaClient;
6//!
7//! let client = KafkaClient::new("localhost:9092");
8//! let producer = client.topic("orders").producer().connect().await?;
9//! producer.send_value("created").await?;
10//! producer.shutdown().await?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::time::Duration;
16
17use anyhow::anyhow;
18
19use crate::admin::KafkaAdmin;
20use crate::consumer::KafkaConsumer;
21use crate::producer::KafkaProducer;
22use crate::{
23    AdminConfig, AutoOffsetReset, ConsumerConfig, IsolationLevel, ProducerCompression,
24    ProducerConfig, Result, SaslConfig, SecurityProtocol, TlsConfig,
25};
26
27const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
28
29#[derive(Debug, Clone)]
30/// A small builder facade around one or more bootstrap servers.
31pub struct KafkaClient {
32    bootstrap_servers: Vec<String>,
33}
34
35impl KafkaClient {
36    /// Creates a client facade from one bootstrap server.
37    pub fn new(bootstrap_server: impl Into<String>) -> Self {
38        Self {
39            bootstrap_servers: vec![bootstrap_server.into()],
40        }
41    }
42
43    /// Sets bootstrap servers and returns the updated value.
44    pub fn with_bootstrap_servers(
45        mut self,
46        servers: impl IntoIterator<Item = impl Into<String>>,
47    ) -> Self {
48        self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
49        self
50    }
51
52    /// Returns a topic-scoped builder facade.
53    pub fn topic(&self, topic: impl Into<String>) -> KafkaTopic {
54        KafkaTopic {
55            bootstrap_servers: self.bootstrap_servers.clone(),
56            topic: topic.into(),
57        }
58    }
59
60    /// Starts building a producer.
61    pub fn producer(&self) -> ProducerBuilder {
62        ProducerBuilder::from_servers(self.bootstrap_servers.clone())
63    }
64
65    /// Starts building an admin client.
66    pub fn admin(&self) -> AdminBuilder {
67        AdminBuilder::from_servers(self.bootstrap_servers.clone())
68    }
69
70    /// Starts building a group consumer.
71    pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
72        ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
73    }
74}
75
76#[derive(Debug, Clone)]
77/// A topic-scoped facade for producers and consumers.
78pub struct KafkaTopic {
79    bootstrap_servers: Vec<String>,
80    topic: String,
81}
82
83impl KafkaTopic {
84    /// Starts building a producer with this topic as the default.
85    pub fn producer(&self) -> ProducerBuilder {
86        ProducerBuilder::from_servers(self.bootstrap_servers.clone())
87            .with_default_topic(self.topic.clone())
88    }
89
90    /// Starts building a consumer already subscribed to this topic.
91    pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
92        ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
93            .with_topic(self.topic.clone())
94    }
95}
96
97#[derive(Debug, Clone)]
98/// Builder for a [`KafkaProducer`].
99pub struct ProducerBuilder {
100    config: ProducerConfig,
101    default_topic: Option<String>,
102    default_partition: Option<i32>,
103}
104
105impl ProducerBuilder {
106    fn new(bootstrap_server: impl Into<String>) -> Self {
107        Self {
108            config: ProducerConfig::new(bootstrap_server),
109            default_topic: None,
110            default_partition: None,
111        }
112    }
113
114    fn from_servers(servers: Vec<String>) -> Self {
115        let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
116        builder.config = builder.config.with_bootstrap_servers(servers);
117        builder
118    }
119
120    /// Sets bootstrap servers and returns the updated value.
121    pub fn with_bootstrap_servers(
122        mut self,
123        servers: impl IntoIterator<Item = impl Into<String>>,
124    ) -> Self {
125        self.config = self.config.with_bootstrap_servers(servers);
126        self
127    }
128
129    /// Sets client id and returns the updated value.
130    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
131        self.config = self.config.with_client_id(client_id);
132        self
133    }
134
135    /// Sets security protocol and returns the updated value.
136    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
137        self.config = self.config.with_security_protocol(security_protocol);
138        self
139    }
140
141    /// Sets tls and returns the updated value.
142    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
143        self.config = self.config.with_tls(tls);
144        self
145    }
146
147    /// Sets sasl and returns the updated value.
148    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
149        self.config = self.config.with_sasl(sasl);
150        self
151    }
152
153    /// Sets sasl plain and returns the updated value.
154    pub fn with_sasl_plain(
155        mut self,
156        username: impl Into<String>,
157        password: impl Into<String>,
158    ) -> Self {
159        self.config = self.config.with_sasl_plain(username, password);
160        self
161    }
162
163    /// Sets sasl scram sha 256 and returns the updated value.
164    pub fn with_sasl_scram_sha_256(
165        mut self,
166        username: impl Into<String>,
167        password: impl Into<String>,
168    ) -> Self {
169        self.config = self.config.with_sasl_scram_sha_256(username, password);
170        self
171    }
172
173    /// Sets sasl scram sha 512 and returns the updated value.
174    pub fn with_sasl_scram_sha_512(
175        mut self,
176        username: impl Into<String>,
177        password: impl Into<String>,
178    ) -> Self {
179        self.config = self.config.with_sasl_scram_sha_512(username, password);
180        self
181    }
182
183    /// Sets compression and returns the updated value.
184    pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
185        self.config = self.config.with_compression(compression);
186        self
187    }
188
189    /// Sets default topic and returns the updated value.
190    pub fn with_default_topic(mut self, topic: impl Into<String>) -> Self {
191        self.default_topic = Some(topic.into());
192        self
193    }
194
195    /// Sets default partition and returns the updated value.
196    pub fn with_default_partition(mut self, partition: i32) -> Self {
197        self.default_partition = Some(partition);
198        self
199    }
200
201    /// Sets enable idempotence and returns the updated value.
202    pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
203        self.config = self.config.with_enable_idempotence(enable_idempotence);
204        self
205    }
206
207    /// Sets batch size and returns the updated value.
208    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
209        self.config = self.config.with_batch_size(batch_size);
210        self
211    }
212
213    /// Sets linger and returns the updated value.
214    pub fn with_linger(mut self, linger: Duration) -> Self {
215        self.config = self.config.with_linger(linger);
216        self
217    }
218
219    /// Sets delivery timeout and returns the updated value.
220    pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
221        self.config = self.config.with_delivery_timeout(delivery_timeout);
222        self
223    }
224
225    /// Sets max in-flight requests per broker connection and returns the updated value.
226    pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
227        self.config = self
228            .config
229            .with_max_in_flight_requests_per_connection(max_in_flight);
230        self
231    }
232
233    /// Sets transactional id and returns the updated value.
234    pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
235        self.config = self.config.with_transactional_id(transactional_id);
236        self
237    }
238
239    /// Connects and returns a producer.
240    pub async fn connect(self) -> Result<KafkaProducer> {
241        let producer = KafkaProducer::connect(self.config).await?;
242        Ok(producer.with_defaults(self.default_topic, self.default_partition))
243    }
244}
245
246#[derive(Debug, Clone)]
247/// Builder for a [`KafkaAdmin`].
248pub struct AdminBuilder {
249    config: AdminConfig,
250}
251
252impl AdminBuilder {
253    fn new(bootstrap_server: impl Into<String>) -> Self {
254        Self {
255            config: AdminConfig::new(bootstrap_server),
256        }
257    }
258
259    fn from_servers(servers: Vec<String>) -> Self {
260        let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
261        builder.config = builder.config.with_bootstrap_servers(servers);
262        builder
263    }
264
265    /// Sets bootstrap servers and returns the updated value.
266    pub fn with_bootstrap_servers(
267        mut self,
268        servers: impl IntoIterator<Item = impl Into<String>>,
269    ) -> Self {
270        self.config = self.config.with_bootstrap_servers(servers);
271        self
272    }
273
274    /// Sets client id and returns the updated value.
275    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
276        self.config = self.config.with_client_id(client_id);
277        self
278    }
279
280    /// Sets request timeout and returns the updated value.
281    pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
282        self.config = self.config.with_request_timeout(request_timeout);
283        self
284    }
285
286    /// Sets security protocol and returns the updated value.
287    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
288        self.config = self.config.with_security_protocol(security_protocol);
289        self
290    }
291
292    /// Sets tls and returns the updated value.
293    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
294        self.config = self.config.with_tls(tls);
295        self
296    }
297
298    /// Sets sasl and returns the updated value.
299    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
300        self.config = self.config.with_sasl(sasl);
301        self
302    }
303
304    /// Sets sasl plain and returns the updated value.
305    pub fn with_sasl_plain(
306        mut self,
307        username: impl Into<String>,
308        password: impl Into<String>,
309    ) -> Self {
310        self.config = self.config.with_sasl_plain(username, password);
311        self
312    }
313
314    /// Sets sasl scram sha 256 and returns the updated value.
315    pub fn with_sasl_scram_sha_256(
316        mut self,
317        username: impl Into<String>,
318        password: impl Into<String>,
319    ) -> Self {
320        self.config = self.config.with_sasl_scram_sha_256(username, password);
321        self
322    }
323
324    /// Sets sasl scram sha 512 and returns the updated value.
325    pub fn with_sasl_scram_sha_512(
326        mut self,
327        username: impl Into<String>,
328        password: impl Into<String>,
329    ) -> Self {
330        self.config = self.config.with_sasl_scram_sha_512(username, password);
331        self
332    }
333
334    /// Connects and returns an admin client.
335    pub async fn connect(self) -> Result<KafkaAdmin> {
336        KafkaAdmin::connect(self.config).await
337    }
338}
339
340#[derive(Debug, Clone)]
341/// Builder for a [`KafkaConsumer`].
342pub struct ConsumerBuilder {
343    config: ConsumerConfig,
344    topics: Vec<String>,
345    poll_timeout: Duration,
346}
347
348impl ConsumerBuilder {
349    fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
350        Self {
351            config: ConsumerConfig::new(bootstrap_server, group_id),
352            topics: Vec::new(),
353            poll_timeout: DEFAULT_POLL_TIMEOUT,
354        }
355    }
356
357    fn from_servers(servers: Vec<String>, group_id: impl Into<String>) -> Self {
358        let first = servers.first().cloned().unwrap_or_default();
359        let mut builder = Self::new(first, group_id);
360        builder.config = builder.config.with_bootstrap_servers(servers);
361        builder
362    }
363
364    /// Sets bootstrap servers and returns the updated value.
365    pub fn with_bootstrap_servers(
366        mut self,
367        servers: impl IntoIterator<Item = impl Into<String>>,
368    ) -> Self {
369        self.config = self.config.with_bootstrap_servers(servers);
370        self
371    }
372
373    /// Sets client id and returns the updated value.
374    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
375        self.config = self.config.with_client_id(client_id);
376        self
377    }
378
379    /// Sets security protocol and returns the updated value.
380    pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
381        self.config = self.config.with_security_protocol(security_protocol);
382        self
383    }
384
385    /// Sets tls and returns the updated value.
386    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
387        self.config = self.config.with_tls(tls);
388        self
389    }
390
391    /// Sets sasl and returns the updated value.
392    pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
393        self.config = self.config.with_sasl(sasl);
394        self
395    }
396
397    /// Sets sasl plain and returns the updated value.
398    pub fn with_sasl_plain(
399        mut self,
400        username: impl Into<String>,
401        password: impl Into<String>,
402    ) -> Self {
403        self.config = self.config.with_sasl_plain(username, password);
404        self
405    }
406
407    /// Sets sasl scram sha 256 and returns the updated value.
408    pub fn with_sasl_scram_sha_256(
409        mut self,
410        username: impl Into<String>,
411        password: impl Into<String>,
412    ) -> Self {
413        self.config = self.config.with_sasl_scram_sha_256(username, password);
414        self
415    }
416
417    /// Sets sasl scram sha 512 and returns the updated value.
418    pub fn with_sasl_scram_sha_512(
419        mut self,
420        username: impl Into<String>,
421        password: impl Into<String>,
422    ) -> Self {
423        self.config = self.config.with_sasl_scram_sha_512(username, password);
424        self
425    }
426
427    /// Sets topic and returns the updated value.
428    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
429        self.topics.push(topic.into());
430        self
431    }
432
433    /// Sets topics and returns the updated value.
434    pub fn with_topics<I, S>(mut self, topics: I) -> Self
435    where
436        I: IntoIterator<Item = S>,
437        S: Into<String>,
438    {
439        self.topics.extend(topics.into_iter().map(Into::into));
440        self
441    }
442
443    /// Sets auto offset reset and returns the updated value.
444    pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
445        self.config = self.config.with_auto_offset_reset(auto_offset_reset);
446        self
447    }
448
449    /// Sets isolation level and returns the updated value.
450    pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
451        self.config = self.config.with_isolation_level(isolation_level);
452        self
453    }
454
455    /// Sets auto commit and returns the updated value.
456    pub fn with_auto_commit(mut self, enable_auto_commit: bool) -> Self {
457        self.config = self.config.with_enable_auto_commit(enable_auto_commit);
458        self
459    }
460
461    /// Sets instance id and returns the updated value.
462    pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
463        self.config = self.config.with_instance_id(instance_id);
464        self
465    }
466
467    /// Sets poll timeout and returns the updated value.
468    pub fn with_poll_timeout(mut self, poll_timeout: Duration) -> Self {
469        self.poll_timeout = poll_timeout;
470        self
471    }
472
473    /// Connects and returns a consumer.
474    pub async fn connect(self) -> Result<KafkaConsumer> {
475        let topics = self
476            .topics
477            .into_iter()
478            .map(validate_topic_name)
479            .collect::<Result<Vec<_>>>()?;
480        let consumer = KafkaConsumer::connect(self.config).await?;
481        if !topics.is_empty()
482            && let Err(error) = consumer.subscribe(topics).await
483        {
484            let _ = consumer.shutdown().await;
485            return Err(error);
486        }
487        Ok(consumer.with_default_poll_timeout(self.poll_timeout))
488    }
489}
490
491fn validate_topic_name(topic: String) -> Result<String> {
492    let topic = topic.trim();
493    if topic.is_empty() {
494        return Err(anyhow!("topic must be non-empty").into());
495    }
496    Ok(topic.to_owned())
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502    use crate::SaslMechanism;
503
504    #[test]
505    fn client_builders_preserve_bootstrap_servers_and_topic_defaults() {
506        let client =
507            KafkaClient::new("host-a:9092").with_bootstrap_servers(["host-a:9092", "host-b:9092"]);
508
509        let producer = client
510            .topic("orders")
511            .producer()
512            .with_client_id("producer-a")
513            .with_default_partition(2)
514            .with_compression(ProducerCompression::Lz4)
515            .with_batch_size(32)
516            .with_linger(Duration::from_millis(10))
517            .with_delivery_timeout(Duration::from_secs(3))
518            .with_transactional_id("tx-a");
519
520        assert_eq!(
521            producer.config.bootstrap_servers,
522            vec!["host-a:9092", "host-b:9092"]
523        );
524        assert_eq!(producer.config.client_id, "producer-a");
525        assert_eq!(producer.config.compression, ProducerCompression::Lz4);
526        assert_eq!(producer.config.batch_size, 32);
527        assert_eq!(producer.config.linger, Duration::from_millis(10));
528        assert_eq!(producer.config.delivery_timeout, Duration::from_secs(3));
529        assert_eq!(producer.config.transactional_id.as_deref(), Some("tx-a"));
530        assert_eq!(producer.default_topic.as_deref(), Some("orders"));
531        assert_eq!(producer.default_partition, Some(2));
532    }
533
534    #[test]
535    fn admin_builder_forwards_security_and_timeout_options() {
536        let builder = KafkaClient::new("host-a:9092")
537            .admin()
538            .with_bootstrap_servers(["host-b:9092", "host-c:9092"])
539            .with_client_id("admin-a")
540            .with_request_timeout(Duration::from_secs(9))
541            .with_security_protocol(SecurityProtocol::Ssl)
542            .with_tls(TlsConfig::new().with_server_name("kafka.internal"))
543            .with_sasl_scram_sha_512("user-a", "secret-a");
544
545        assert_eq!(
546            builder.config.bootstrap_servers,
547            vec!["host-b:9092", "host-c:9092"]
548        );
549        assert_eq!(builder.config.client_id, "admin-a");
550        assert_eq!(builder.config.request_timeout, Duration::from_secs(9));
551        assert_eq!(builder.config.security_protocol, SecurityProtocol::SaslSsl);
552        assert_eq!(
553            builder.config.tls.server_name.as_deref(),
554            Some("kafka.internal")
555        );
556        assert_eq!(builder.config.sasl.mechanism, SaslMechanism::ScramSha512);
557    }
558
559    #[test]
560    fn consumer_builder_collects_topics_and_group_options() {
561        let builder = KafkaClient::new("host-a:9092")
562            .topic("orders")
563            .consumer("group-a")
564            .with_bootstrap_servers(["host-b:9092"])
565            .with_client_id("consumer-a")
566            .with_topic("payments")
567            .with_topics(["shipments", "invoices"])
568            .with_auto_offset_reset(AutoOffsetReset::Latest)
569            .with_isolation_level(IsolationLevel::ReadCommitted)
570            .with_auto_commit(true)
571            .with_instance_id("instance-a")
572            .with_poll_timeout(Duration::from_millis(250))
573            .with_sasl_plain("user-a", "secret-a");
574
575        assert_eq!(builder.config.bootstrap_servers, vec!["host-b:9092"]);
576        assert_eq!(builder.config.client_id, "consumer-a");
577        assert_eq!(builder.config.group_id, "group-a");
578        assert_eq!(
579            builder.topics,
580            vec!["orders", "payments", "shipments", "invoices"]
581        );
582        assert_eq!(builder.config.auto_offset_reset, AutoOffsetReset::Latest);
583        assert_eq!(
584            builder.config.isolation_level,
585            IsolationLevel::ReadCommitted
586        );
587        assert!(builder.config.enable_auto_commit);
588        assert_eq!(builder.config.instance_id.as_deref(), Some("instance-a"));
589        assert_eq!(builder.poll_timeout, Duration::from_millis(250));
590        assert_eq!(
591            builder.config.security_protocol,
592            SecurityProtocol::SaslPlaintext
593        );
594    }
595
596    #[test]
597    fn validate_topic_name_trims_and_rejects_empty_names() {
598        assert_eq!(
599            validate_topic_name("  topic-a  ".to_owned()).unwrap(),
600            "topic-a"
601        );
602        assert!(validate_topic_name("   ".to_owned()).is_err());
603    }
604}