kafka_http/
lib.rs

1use reqwest::{header, Client};
2use crate::api::{commit, create_consumer, poll, produce, subscribe};
3use crate::error::Error;
4use crate::error::ErrorType::InvalidInput;
5use crate::types::{CreateConsumerParams, PartitionOffsetCommitParams, ProduceParams, Record, SubscribeParams};
6use base64::{engine::general_purpose, Engine as _};
7
8pub(crate) mod api;
9pub mod error;
10pub mod types;
11
12
13/// A client for interacting with Kafka REST Proxy over HTTP.
14///
15/// This client provides methods for creating consumers, subscribing to topics,
16/// polling for records, producing messages, and committing offsets using the Kafka REST API.
17///
18/// # Examples
19///
20/// ```no_run
21/// use kafka_http::KafkaHttpClient;
22///
23/// let mut client = KafkaHttpClient::new("http://localhost:8082");
24/// client.set_timeout_ms(5000);
25/// ```
26#[derive(Debug, Clone)]
27pub struct KafkaHttpClient {
28    pub(crate) client: Client,
29    pub(crate) base_uri: String,
30    pub(crate) consumer_uri: Option<String>,
31    pub(crate) timeout_ms: u64,
32    pub(crate) target_topic: Option<String>
33}
34
35impl KafkaHttpClient {
36    /// Creates a new `KafkaHttpClient` instance.
37    ///
38    /// # Arguments
39    ///
40    /// * `base_uri` - The base URI of the Kafka REST Proxy (e.g., "http://localhost:8082")
41    ///
42    /// # Returns
43    ///
44    /// A new `KafkaHttpClient` with default timeout of 1000ms
45    ///
46    /// # Examples
47    ///
48    /// ```no_run
49    /// use kafka_http::KafkaHttpClient;
50    ///
51    /// let client = KafkaHttpClient::new("http://localhost:8082");
52    /// ```
53    pub fn new(base_uri: &str) -> Self {
54        Self {
55            client: reqwest::Client::new(),
56            base_uri: base_uri.to_string(),
57            target_topic: None,
58            consumer_uri: None,
59            timeout_ms: 1000,
60        }
61    }
62
63    /// Helper util to rebuild and set the uri if a conflict exists
64    fn rebuild_consumer_uri(&self, group: &str, consumer_id: &str) -> String {
65        // "http://localhost:18082/consumers/demo-group/instances/consumer-1"
66        format!("{}/consumers/{group}/instances/{consumer_id}", self.base_uri)
67    }
68
69
70    /// Sets HTTP Basic Authentication credentials for the client.
71    ///
72    /// This method configures the client to use HTTP Basic Authentication by encoding
73    /// the provided username and password and adding them as a default Authorization
74    /// header to all subsequent requests.
75    ///
76    /// # Arguments
77    ///
78    /// * `username` - The username for Basic Authentication
79    /// * `password` - The password for Basic Authentication
80    ///
81    /// # Returns
82    ///
83    /// * `Ok(())` - Authentication was successfully configured
84    /// * `Err(Error)` - An error if the client could not be rebuilt with the new headers
85    ///
86    /// # Examples
87    ///
88    /// ```no_run
89    /// use kafka_http::KafkaHttpClient;
90    ///
91    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
92    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
93    /// client.set_basic_auth("my-username", "my-password")?;
94    /// // All subsequent requests will include the Basic Auth header
95    /// # Ok(())
96    /// # }
97    /// ```
98    pub fn set_basic_auth(&mut self, username: &str, password: &str) -> Result<(), Error> {
99        // Create a new client with default headers the provided auth params
100        let auth_str = format!("{}:{}", username, password);
101        let encoded = general_purpose::STANDARD.encode(auth_str);
102
103        let mut headers = header::HeaderMap::new();
104        headers.insert(
105            header::AUTHORIZATION,
106            header::HeaderValue::from_str(&format!("Basic {}", encoded)).unwrap(),
107        );
108
109        // Replace the client with one that has the default header
110        let new_client = Client::builder()
111            .default_headers(headers)
112            .build()?;
113
114        self.client = new_client;
115        Ok(())
116    }
117
118    /// Sets the consumer URI for this client.
119    ///
120    /// This is typically called internally after creating a consumer, but can be used
121    /// to manually set a consumer URI if needed.
122    ///
123    /// # Arguments
124    ///
125    /// * `uri` - The consumer instance URI to set
126    pub fn set_consumer_uri(&mut self, uri: &String) {
127        tracing::debug!("Setting consumer URI to {uri}");
128        self.consumer_uri = Some(uri.clone());
129    }
130
131    /// Sets the timeout duration for polling operations.
132    ///
133    /// # Arguments
134    ///
135    /// * `timeout_ms` - The timeout duration in milliseconds
136    ///
137    /// # Examples
138    ///
139    /// ```no_run
140    /// use kafka_http::KafkaHttpClient;
141    ///
142    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
143    /// client.set_timeout_ms(5000);  // Set 5 second timeout
144    /// ```
145    pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
146        tracing::debug!("Setting timeout to {timeout_ms} ms");
147        self.timeout_ms = timeout_ms;
148    }
149
150    /// Creates a new consumer in the specified consumer group.
151    ///
152    /// This method will fail if the consumer already exists. Use `try_create_consumer`
153    /// if you want to handle existing consumers gracefully.
154    ///
155    /// # Arguments
156    ///
157    /// * `group` - The consumer group name
158    /// * `params` - Parameters for creating the consumer (name, format, etc.)
159    ///
160    /// # Returns
161    ///
162    /// * `Ok(String)` - The consumer instance URI on success
163    /// * `Err(Error)` - An error if the consumer creation fails or URI is not returned
164    ///
165    /// # Examples
166    ///
167    /// ```no_run
168    /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
169    ///
170    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
171    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
172    /// let params = CreateConsumerParams {
173    ///     name: "consumer-1".to_string(),
174    ///     format: "json".to_string(),
175    ///     ..Default::default()
176    /// };
177    /// let consumer_uri = client.create_consumer("my-group", &params).await?;
178    /// # Ok(())
179    /// # }
180    /// ```
181    pub async fn create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<String, Error> {
182        tracing::debug!("Creating consumer for group {group} - params: {params:?}");
183        let try_consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
184
185        let Some(consumer_uri) = try_consumer_uri else {
186            tracing::error!("failed to create consumer, consumer_uri is not set");
187            return Err(Error::new(InvalidInput, "failed to create consumer, consumer_uri is not set"));
188        };
189
190        tracing::debug!("setting stored consumer_uri: {consumer_uri}");
191        self.set_consumer_uri(&consumer_uri);
192        Ok(consumer_uri.clone())
193    }
194
195    /// Attempts to create a new consumer, handling the case where the consumer already exists.
196    ///
197    /// If the consumer already exists, this method will reconstruct the consumer URI based on
198    /// the group and consumer name, and return it wrapped in `Some`. This allows reconnecting
199    /// to existing consumers.
200    ///
201    /// # Arguments
202    ///
203    /// * `group` - The consumer group name
204    /// * `params` - Parameters for creating the consumer (name, format, etc.)
205    ///
206    /// # Returns
207    ///
208    /// * `Ok(Some(String))` - The consumer instance URI (new or reconstructed)
209    /// * `Ok(None)` - Should not occur in current implementation
210    /// * `Err(Error)` - An error if the operation fails
211    ///
212    /// # Examples
213    ///
214    /// ```no_run
215    /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
216    ///
217    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
218    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
219    /// let params = CreateConsumerParams {
220    ///     name: "consumer-1".to_string(),
221    ///     format: "json".to_string(),
222    ///     ..Default::default()
223    /// };
224    /// // This won't fail if consumer already exists
225    /// let consumer_uri = client.try_create_consumer("my-group", &params).await?;
226    /// # Ok(())
227    /// # }
228    /// ```
229    pub async fn try_create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<Option<String>, Error> {
230        tracing::debug!("Creating consumer for group {group} - params: {params:?}");
231        let consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
232        match consumer_uri {
233            Some(new_uri) => {
234                tracing::debug!("setting stored consumer_uri: {new_uri}");
235                self.set_consumer_uri(&new_uri);
236                Ok(Some(new_uri))
237            }
238            None => {
239                tracing::debug!("consumer already exists");
240                let group = group.to_string();
241                let consumer_id = &params.name;
242                let built_consumer_uri = self.rebuild_consumer_uri(&group, consumer_id);
243
244                tracing::debug!("setting stored consumer_uri: {built_consumer_uri}");
245
246                self.set_consumer_uri(&built_consumer_uri);
247                Ok(self.consumer_uri.clone())
248            }
249        }
250    }
251
252    /// Subscribes the consumer to one or more topics.
253    ///
254    /// A consumer must be created before calling this method. The consumer URI must be set
255    /// (either by creating a consumer or manually setting it).
256    ///
257    /// # Arguments
258    ///
259    /// * `group` - The consumer group name (used for logging)
260    /// * `params` - Subscription parameters including topics to subscribe to
261    ///
262    /// # Returns
263    ///
264    /// * `Ok(())` - Subscription was successful
265    /// * `Err(Error)` - An error if consumer URI is not set or subscription fails
266    ///
267    /// # Examples    /// Polls for new records from subscribed topics.
268    ///
269    /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
270    /// created and subscribed to topics before polling.
271    ///
272    /// # Returns
273    ///
274    /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
275    /// * `Err(Error)` - An error if consumer URI is not set or polling fails
276    ///
277    /// # Examples
278    ///
279    /// ```no_run
280    /// use kafka_http::KafkaHttpClient;
281    ///
282    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
283    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
284    /// // ... create consumer and subscribe ...
285    /// let records = client.poll().await?;
286    /// for record in records {
287    ///     println!("Received: {:?}", record);
288    /// }
289    /// # Ok(())
290    /// # }
291    /// ```
292    ///
293    /// ```no_run
294    /// use kafka_http::{KafkaHttpClient, types::{CreateConsumerParams, SubscribeParams}};
295    ///
296    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
297    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
298    /// // ... create consumer first ...
299    /// let subscribe_params = SubscribeParams {
300    ///     topics: vec!["my-topic".to_string()],
301    ///     ..Default::default()
302    /// };
303    /// client.subscribe("my-group", &subscribe_params).await?;
304    /// # Ok(())
305    /// # }
306    /// ```
307    pub async fn subscribe(&self, group: &str, params: &SubscribeParams) -> Result<(), Error> {
308        let Some(consumer_uri) = &self.consumer_uri else {
309            tracing::error!("Consumer URI is not set");
310            return Err(Error::new(InvalidInput, "Consumer URI is not set"));
311        };
312        tracing::debug!("Subscribing to {group} - params: {params:?}");
313        subscribe(&self.client, &consumer_uri, params).await
314    }
315
316    /// Polls for new records from subscribed topics.
317    ///
318    /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
319    /// created and subscribed to topics before polling.
320    ///
321    /// # Returns
322    ///
323    /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
324    /// * `Err(Error)` - An error if consumer URI is not set or polling fails
325    ///
326    /// # Examples
327    ///
328    /// ```no_run
329    /// use kafka_http::KafkaHttpClient;
330    ///
331    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
332    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
333    /// // ... create consumer and subscribe ...
334    /// let records = client.poll().awaait?;
335    /// for record in records {
336    ///     println!("Received: {:?}", record);
337    /// }
338    /// # Ok(())
339    /// # }
340    /// ```
341    pub async fn poll(&self) -> Result<Vec<Record>, Error> {
342        let Some(consumer_uri) = &self.consumer_uri else {
343            tracing::error!("Consumer URI is not set");
344            return Err(Error::new(InvalidInput, "Consumer URI is not set"));
345        };
346        tracing::debug!("Polling");
347        poll(&self.client, &consumer_uri, self.timeout_ms).await
348    }
349
350    /// Produces records to a Kafka topic.
351    ///
352    /// # Arguments
353    ///
354    /// * `topic` - The name of the topic to produce to
355    /// * `params` - Parameters containing the records to produce
356    ///
357    /// # Returns
358    ///
359    /// * `Ok(())` - Records were successfully produced
360    /// * `Err(Error)` - An error if production fails
361    ///
362    /// # Examples
363    ///
364    /// ```no_run
365    /// use kafka_http::{KafkaHttpClient, types::ProduceParams};
366    ///
367    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
368    /// let client = KafkaHttpClient::new("http://localhost:8082");
369    /// let params = ProduceParams {
370    ///     records: vec![/* records */],
371    ///     ..Default::default()
372    /// };
373    /// client.produce_to_topic("my-topic", &params).await?;
374    /// # Ok(())
375    /// # }
376    /// ```
377    pub async fn produce_to_topic(&self, topic: &str, params: &ProduceParams) -> Result<(), Error> {
378        tracing::debug!("producing records");
379        produce(&self.client, &self.base_uri, topic, params).await
380    }
381
382    /// Sets the target topic for subsequent produce operations.
383    ///
384    /// This allows using the `produce()` method without specifying a topic each time.
385    /// The topic will be used for all subsequent calls to `produce()` until changed.
386    ///
387    /// # Arguments
388    ///
389    /// * `topic` - The name of the topic to use for future produce operations
390    ///
391    /// # Examples
392    ///
393    /// ```no_run
394    /// use kafka_http::KafkaHttpClient;
395    ///
396    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
397    /// client.set_target_topic("my-topic");
398    /// // Now can use produce() without specifying topic
399    /// ```
400    pub fn set_target_topic(&mut self, topic: &str) {
401        self.target_topic = Some(topic.to_string());
402    }
403
404    /// Produces records to the previously set target topic.
405    ///
406    /// This is a convenience method that uses the topic set via `set_target_topic()`.
407    /// If no target topic has been set, this method will return an error.
408    ///
409    /// # Arguments
410    ///
411    /// * `params` - Parameters containing the records to produce
412    ///
413    /// # Returns
414    ///
415    /// * `Ok(())` - Records were successfully produced
416    /// * `Err(Error)` - An error if target topic is not set or production fails
417    ///
418    /// # Examples
419    ///
420    /// ```no_run
421    /// use kafka_http::{KafkaHttpClient, types::ProduceParams};
422    ///
423    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
424    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
425    /// client.set_target_topic("my-topic");
426    ///
427    /// let params = ProduceParams {
428    ///     records: vec![/* records */],
429    ///     ..Default::default()
430    /// };
431    /// client.produce(&params).await?;
432    /// # Ok(())
433    /// # }
434    /// ```
435    pub async fn produce(&self, params: &ProduceParams) -> Result<(), Error> {
436        let Some(topic) = &self.target_topic else {
437            tracing::error!("Target topic is not set");
438            return Err(Error::new(InvalidInput, "Target topic is not set"));
439        };
440        tracing::debug!("producing records to topic: {topic}");
441        produce(&self.client, &self.base_uri, topic, params).await
442    }
443    /// Commits offsets for consumed messages.
444    ///
445    /// A consumer must be created before calling this method. The consumer URI must be set.
446    ///
447    /// # Arguments
448    ///
449    /// * `params` - Parameters specifying which partition offsets to commit
450    ///
451    /// # Returns
452    ///
453    /// * `Ok(())` - Offsets were successfully committed
454    /// * `Err(Error)` - An error if consumer URI is not set or commit fails
455    ///
456    /// # Examples
457    ///
458    /// ```no_run
459    /// use kafka_http::{KafkaHttpClient, types::PartitionOffsetCommitParams};
460    ///
461    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
462    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
463    /// // ... create consumer, subscribe, and poll ...
464    /// let params = PartitionOffsetCommitParams {
465    ///     partitions: vec![/* partition offsets */],
466    ///     ..Default::default()
467    /// };
468    /// client.commit(&params).await?;
469    /// # Ok(())
470    /// # }
471    /// ```
472    pub async fn commit(&self, params: &PartitionOffsetCommitParams) -> Result<(), Error> {
473        let Some(consumer_uri) = &self.consumer_uri else {
474            tracing::error!("Consumer URI is not set");
475            return Err(Error::new(InvalidInput, "Consumer URI is not set"));
476        };
477        tracing::debug!("commiting offsets");
478        commit(&self.client, &consumer_uri, params).await
479    }
480}