kafka_http/
lib.rs

1use reqwest::Client;
2use crate::api::{commit, create_consumer, poll, produce, subscribe};
3use crate::error::Error;
4use crate::error::ErrorType::InvalidInput;
5use crate::types::{CreateConsumerParams, PartitionOffsetCommitParams, ProduceParams, Record, SubscribeParams};
6
7pub(crate) mod api;
8pub mod error;
9pub mod types;
10
11
12/// A client for interacting with Kafka REST Proxy over HTTP.
13///
14/// This client provides methods for creating consumers, subscribing to topics,
15/// polling for records, producing messages, and committing offsets using the Kafka REST API.
16///
17/// # Examples
18///
19/// ```no_run
20/// use kafka_http::KafkaHttpClient;
21///
22/// let mut client = KafkaHttpClient::new("http://localhost:8082");
23/// client.set_timeout_ms(5000);
24/// ```
25#[derive(Debug, Clone)]
26pub struct KafkaHttpClient {
27    pub(crate) client: Client,
28    pub(crate) base_uri: String,
29    pub(crate) consumer_uri: Option<String>,
30    pub(crate) timeout_ms: u64,
31    pub(crate) target_topic: Option<String>
32}
33
34impl KafkaHttpClient {
35    /// Creates a new `KafkaHttpClient` instance.
36    ///
37    /// # Arguments
38    ///
39    /// * `base_uri` - The base URI of the Kafka REST Proxy (e.g., "http://localhost:8082")
40    ///
41    /// # Returns
42    ///
43    /// A new `KafkaHttpClient` with default timeout of 1000ms
44    ///
45    /// # Examples
46    ///
47    /// ```no_run
48    /// use kafka_http::KafkaHttpClient;
49    ///
50    /// let client = KafkaHttpClient::new("http://localhost:8082");
51    /// ```
52    pub fn new(base_uri: &str) -> Self {
53        Self {
54            client: reqwest::Client::new(),
55            base_uri: base_uri.to_string(),
56            target_topic: None,
57            consumer_uri: None,
58            timeout_ms: 1000,
59        }
60    }
61
62    /// Helper util to rebuild and set the uri if a conflict exists
63    fn rebuild_consumer_uri(&self, group: &str, consumer_id: &str) -> String {
64        // "http://localhost:18082/consumers/demo-group/instances/consumer-1"
65        format!("{}/consumers/{group}/instances/{consumer_id}", self.base_uri)
66    }
67
68    /// Sets the consumer URI for this client.
69    ///
70    /// This is typically called internally after creating a consumer, but can be used
71    /// to manually set a consumer URI if needed.
72    ///
73    /// # Arguments
74    ///
75    /// * `uri` - The consumer instance URI to set
76    pub fn set_consumer_uri(&mut self, uri: &String) {
77        tracing::debug!("Setting consumer URI to {uri}");
78        self.consumer_uri = Some(uri.clone());
79    }
80
81    /// Sets the timeout duration for polling operations.
82    ///
83    /// # Arguments
84    ///
85    /// * `timeout_ms` - The timeout duration in milliseconds
86    ///
87    /// # Examples
88    ///
89    /// ```no_run
90    /// use kafka_http::KafkaHttpClient;
91    ///
92    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
93    /// client.set_timeout_ms(5000);  // Set 5 second timeout
94    /// ```
95    pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
96        tracing::debug!("Setting timeout to {timeout_ms} ms");
97        self.timeout_ms = timeout_ms;
98    }
99
100    /// Creates a new consumer in the specified consumer group.
101    ///
102    /// This method will fail if the consumer already exists. Use `try_create_consumer`
103    /// if you want to handle existing consumers gracefully.
104    ///
105    /// # Arguments
106    ///
107    /// * `group` - The consumer group name
108    /// * `params` - Parameters for creating the consumer (name, format, etc.)
109    ///
110    /// # Returns
111    ///
112    /// * `Ok(String)` - The consumer instance URI on success
113    /// * `Err(Error)` - An error if the consumer creation fails or URI is not returned
114    ///
115    /// # Examples
116    ///
117    /// ```no_run
118    /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
119    ///
120    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
121    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
122    /// let params = CreateConsumerParams {
123    ///     name: "consumer-1".to_string(),
124    ///     format: "json".to_string(),
125    ///     ..Default::default()
126    /// };
127    /// let consumer_uri = client.create_consumer("my-group", &params).await?;
128    /// # Ok(())
129    /// # }
130    /// ```
131    pub async fn create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<String, Error> {
132        tracing::debug!("Creating consumer for group {group} - params: {params:?}");
133        let try_consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
134
135        let Some(consumer_uri) = try_consumer_uri else {
136            tracing::error!("failed to create consumer, consumer_uri is not set");
137            return Err(Error::new(InvalidInput, "failed to create consumer, consumer_uri is not set"));
138        };
139
140        tracing::debug!("setting stored consumer_uri: {consumer_uri}");
141        self.set_consumer_uri(&consumer_uri);
142        Ok(consumer_uri.clone())
143    }
144
145    /// Attempts to create a new consumer, handling the case where the consumer already exists.
146    ///
147    /// If the consumer already exists, this method will reconstruct the consumer URI based on
148    /// the group and consumer name, and return it wrapped in `Some`. This allows reconnecting
149    /// to existing consumers.
150    ///
151    /// # Arguments
152    ///
153    /// * `group` - The consumer group name
154    /// * `params` - Parameters for creating the consumer (name, format, etc.)
155    ///
156    /// # Returns
157    ///
158    /// * `Ok(Some(String))` - The consumer instance URI (new or reconstructed)
159    /// * `Ok(None)` - Should not occur in current implementation
160    /// * `Err(Error)` - An error if the operation fails
161    ///
162    /// # Examples
163    ///
164    /// ```no_run
165    /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
166    ///
167    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
168    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
169    /// let params = CreateConsumerParams {
170    ///     name: "consumer-1".to_string(),
171    ///     format: "json".to_string(),
172    ///     ..Default::default()
173    /// };
174    /// // This won't fail if consumer already exists
175    /// let consumer_uri = client.try_create_consumer("my-group", &params).await?;
176    /// # Ok(())
177    /// # }
178    /// ```
179    pub async fn try_create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<Option<String>, Error> {
180        tracing::debug!("Creating consumer for group {group} - params: {params:?}");
181        let consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
182        match consumer_uri {
183            Some(new_uri) => {
184                tracing::debug!("setting stored consumer_uri: {new_uri}");
185                self.set_consumer_uri(&new_uri);
186                Ok(Some(new_uri))
187            }
188            None => {
189                tracing::debug!("consumer already exists");
190                let group = group.to_string();
191                let consumer_id = &params.name;
192                let built_consumer_uri = self.rebuild_consumer_uri(&group, consumer_id);
193
194                tracing::debug!("setting stored consumer_uri: {built_consumer_uri}");
195
196                self.set_consumer_uri(&built_consumer_uri);
197                Ok(self.consumer_uri.clone())
198            }
199        }
200    }
201
202    /// Subscribes the consumer to one or more topics.
203    ///
204    /// A consumer must be created before calling this method. The consumer URI must be set
205    /// (either by creating a consumer or manually setting it).
206    ///
207    /// # Arguments
208    ///
209    /// * `group` - The consumer group name (used for logging)
210    /// * `params` - Subscription parameters including topics to subscribe to
211    ///
212    /// # Returns
213    ///
214    /// * `Ok(())` - Subscription was successful
215    /// * `Err(Error)` - An error if consumer URI is not set or subscription fails
216    ///
217    /// # Examples    /// Polls for new records from subscribed topics.
218    ///
219    /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
220    /// created and subscribed to topics before polling.
221    ///
222    /// # Returns
223    ///
224    /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
225    /// * `Err(Error)` - An error if consumer URI is not set or polling fails
226    ///
227    /// # Examples
228    ///
229    /// ```no_run
230    /// use kafka_http::KafkaHttpClient;
231    ///
232    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
233    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
234    /// // ... create consumer and subscribe ...
235    /// let records = client.poll().await?;
236    /// for record in records {
237    ///     println!("Received: {:?}", record);
238    /// }
239    /// # Ok(())
240    /// # }
241    /// ```
242    ///
243    /// ```no_run
244    /// use kafka_http::{KafkaHttpClient, types::{CreateConsumerParams, SubscribeParams}};
245    ///
246    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
247    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
248    /// // ... create consumer first ...
249    /// let subscribe_params = SubscribeParams {
250    ///     topics: vec!["my-topic".to_string()],
251    ///     ..Default::default()
252    /// };
253    /// client.subscribe("my-group", &subscribe_params).await?;
254    /// # Ok(())
255    /// # }
256    /// ```
257    pub async fn subscribe(&self, group: &str, params: &SubscribeParams) -> Result<(), Error> {
258        let Some(consumer_uri) = &self.consumer_uri else {
259            tracing::error!("Consumer URI is not set");
260            return Err(Error::new(InvalidInput, "Consumer URI is not set"));
261        };
262        tracing::debug!("Subscribing to {group} - params: {params:?}");
263        subscribe(&self.client, &consumer_uri, params).await
264    }
265
266    /// Polls for new records from subscribed topics.
267    ///
268    /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
269    /// created and subscribed to topics before polling.
270    ///
271    /// # Returns
272    ///
273    /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
274    /// * `Err(Error)` - An error if consumer URI is not set or polling fails
275    ///
276    /// # Examples
277    ///
278    /// ```no_run
279    /// use kafka_http::KafkaHttpClient;
280    ///
281    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
282    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
283    /// // ... create consumer and subscribe ...
284    /// let records = client.poll().awaait?;
285    /// for record in records {
286    ///     println!("Received: {:?}", record);
287    /// }
288    /// # Ok(())
289    /// # }
290    /// ```
291    pub async fn poll(&self) -> Result<Vec<Record>, Error> {
292        let Some(consumer_uri) = &self.consumer_uri else {
293            tracing::error!("Consumer URI is not set");
294            return Err(Error::new(InvalidInput, "Consumer URI is not set"));
295        };
296        tracing::debug!("Polling");
297        poll(&self.client, &consumer_uri, self.timeout_ms).await
298    }
299
300    /// Produces records to a Kafka topic.
301    ///
302    /// # Arguments
303    ///
304    /// * `topic` - The name of the topic to produce to
305    /// * `params` - Parameters containing the records to produce
306    ///
307    /// # Returns
308    ///
309    /// * `Ok(())` - Records were successfully produced
310    /// * `Err(Error)` - An error if production fails
311    ///
312    /// # Examples
313    ///
314    /// ```no_run
315    /// use kafka_http::{KafkaHttpClient, types::ProduceParams};
316    ///
317    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
318    /// let client = KafkaHttpClient::new("http://localhost:8082");
319    /// let params = ProduceParams {
320    ///     records: vec![/* records */],
321    ///     ..Default::default()
322    /// };
323    /// client.produce_to_topic("my-topic", &params).await?;
324    /// # Ok(())
325    /// # }
326    /// ```
327    pub async fn produce_to_topic(&self, topic: &str, params: &ProduceParams) -> Result<(), Error> {
328        tracing::debug!("producing records");
329        produce(&self.client, &self.base_uri, topic, params).await
330    }
331
332    /// Sets the target topic for subsequent produce operations.
333    ///
334    /// This allows using the `produce()` method without specifying a topic each time.
335    /// The topic will be used for all subsequent calls to `produce()` until changed.
336    ///
337    /// # Arguments
338    ///
339    /// * `topic` - The name of the topic to use for future produce operations
340    ///
341    /// # Examples
342    ///
343    /// ```no_run
344    /// use kafka_http::KafkaHttpClient;
345    ///
346    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
347    /// client.set_target_topic("my-topic");
348    /// // Now can use produce() without specifying topic
349    /// ```
350    pub fn set_target_topic(&mut self, topic: &str) {
351        self.target_topic = Some(topic.to_string());
352    }
353
354    /// Produces records to the previously set target topic.
355    ///
356    /// This is a convenience method that uses the topic set via `set_target_topic()`.
357    /// If no target topic has been set, this method will return an error.
358    ///
359    /// # Arguments
360    ///
361    /// * `params` - Parameters containing the records to produce
362    ///
363    /// # Returns
364    ///
365    /// * `Ok(())` - Records were successfully produced
366    /// * `Err(Error)` - An error if target topic is not set or production fails
367    ///
368    /// # Examples
369    ///
370    /// ```no_run
371    /// use kafka_http::{KafkaHttpClient, types::ProduceParams};
372    ///
373    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
374    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
375    /// client.set_target_topic("my-topic");
376    ///
377    /// let params = ProduceParams {
378    ///     records: vec![/* records */],
379    ///     ..Default::default()
380    /// };
381    /// client.produce(&params).await?;
382    /// # Ok(())
383    /// # }
384    /// ```
385    pub async fn produce(&self, params: &ProduceParams) -> Result<(), Error> {
386        let Some(topic) = &self.target_topic else {
387            tracing::error!("Target topic is not set");
388            return Err(Error::new(InvalidInput, "Target topic is not set"));
389        };
390        tracing::debug!("producing records to topic: {topic}");
391        produce(&self.client, &self.base_uri, topic, params).await
392    }
393    /// Commits offsets for consumed messages.
394    ///
395    /// A consumer must be created before calling this method. The consumer URI must be set.
396    ///
397    /// # Arguments
398    ///
399    /// * `params` - Parameters specifying which partition offsets to commit
400    ///
401    /// # Returns
402    ///
403    /// * `Ok(())` - Offsets were successfully committed
404    /// * `Err(Error)` - An error if consumer URI is not set or commit fails
405    ///
406    /// # Examples
407    ///
408    /// ```no_run
409    /// use kafka_http::{KafkaHttpClient, types::PartitionOffsetCommitParams};
410    ///
411    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
412    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
413    /// // ... create consumer, subscribe, and poll ...
414    /// let params = PartitionOffsetCommitParams {
415    ///     partitions: vec![/* partition offsets */],
416    ///     ..Default::default()
417    /// };
418    /// client.commit(&params).await?;
419    /// # Ok(())
420    /// # }
421    /// ```
422    pub async fn commit(&self, params: &PartitionOffsetCommitParams) -> Result<(), Error> {
423        let Some(consumer_uri) = &self.consumer_uri else {
424            tracing::error!("Consumer URI is not set");
425            return Err(Error::new(InvalidInput, "Consumer URI is not set"));
426        };
427        tracing::debug!("commiting offsets");
428        commit(&self.client, &consumer_uri, params).await
429    }
430}