kafka_http/
lib.rs

1use reqwest::Client;
2use crate::api::{commit, create_consumer, poll, produce, subscribe};
3use crate::error::Error;
4use crate::error::ErrorType::InvalidInput;
5use crate::types::{CreateConsumerParams, PartitionOffsetCommitParams, ProduceParams, Record, SubscribeParams};
6
7pub(crate) mod api;
8pub mod error;
9pub mod types;
10
11
12/// A client for interacting with Kafka REST Proxy over HTTP.
13///
14/// This client provides methods for creating consumers, subscribing to topics,
15/// polling for records, producing messages, and committing offsets using the Kafka REST API.
16///
17/// # Examples
18///
19/// ```no_run
20/// use kafka_http::KafkaHttpClient;
21///
22/// let mut client = KafkaHttpClient::new("http://localhost:8082");
23/// client.set_timeout_ms(5000);
24/// ```
25#[derive(Debug, Clone)]
26pub struct KafkaHttpClient {
27    pub(crate) client: Client,
28    pub(crate) base_uri: String,
29    pub(crate) consumer_uri: Option<String>,
30    pub(crate) timeout_ms: u64,
31}
32
33impl KafkaHttpClient {
34    /// Creates a new `KafkaHttpClient` instance.
35    ///
36    /// # Arguments
37    ///
38    /// * `base_uri` - The base URI of the Kafka REST Proxy (e.g., "http://localhost:8082")
39    ///
40    /// # Returns
41    ///
42    /// A new `KafkaHttpClient` with default timeout of 1000ms
43    ///
44    /// # Examples
45    ///
46    /// ```no_run
47    /// use kafka_http::KafkaHttpClient;
48    ///
49    /// let client = KafkaHttpClient::new("http://localhost:8082");
50    /// ```
51    pub fn new(base_uri: &str) -> Self {
52        Self {
53            client: reqwest::Client::new(),
54            base_uri: base_uri.to_string(),
55            consumer_uri: None,
56            timeout_ms: 1000,
57        }
58    }
59
60    /// Helper util to rebuild and set the uri if a conflict exists
61    fn rebuild_consumer_uri(&self, group: &str, consumer_id: &str) -> String {
62        // "http://localhost:18082/consumers/demo-group/instances/consumer-1"
63        format!("{}/consumers/{group}/instances/{consumer_id}", self.base_uri)
64    }
65
66    /// Sets the consumer URI for this client.
67    ///
68    /// This is typically called internally after creating a consumer, but can be used
69    /// to manually set a consumer URI if needed.
70    ///
71    /// # Arguments
72    ///
73    /// * `uri` - The consumer instance URI to set
74    pub fn set_consumer_uri(&mut self, uri: &String) {
75        tracing::debug!("Setting consumer URI to {uri}");
76        self.consumer_uri = Some(uri.clone());
77    }
78
79    /// Sets the timeout duration for polling operations.
80    ///
81    /// # Arguments
82    ///
83    /// * `timeout_ms` - The timeout duration in milliseconds
84    ///
85    /// # Examples
86    ///
87    /// ```no_run
88    /// use kafka_http::KafkaHttpClient;
89    ///
90    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
91    /// client.set_timeout_ms(5000);  // Set 5 second timeout
92    /// ```
93    pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
94        tracing::debug!("Setting timeout to {timeout_ms} ms");
95        self.timeout_ms = timeout_ms;
96    }
97
98    /// Creates a new consumer in the specified consumer group.
99    ///
100    /// This method will fail if the consumer already exists. Use `try_create_consumer`
101    /// if you want to handle existing consumers gracefully.
102    ///
103    /// # Arguments
104    ///
105    /// * `group` - The consumer group name
106    /// * `params` - Parameters for creating the consumer (name, format, etc.)
107    ///
108    /// # Returns
109    ///
110    /// * `Ok(String)` - The consumer instance URI on success
111    /// * `Err(Error)` - An error if the consumer creation fails or URI is not returned
112    ///
113    /// # Examples
114    ///
115    /// ```no_run
116    /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
117    ///
118    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
119    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
120    /// let params = CreateConsumerParams {
121    ///     name: "consumer-1".to_string(),
122    ///     format: "json".to_string(),
123    ///     ..Default::default()
124    /// };
125    /// let consumer_uri = client.create_consumer("my-group", &params).await?;
126    /// # Ok(())
127    /// # }
128    /// ```
129    pub async fn create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<String, Error> {
130        tracing::debug!("Creating consumer for group {group} - params: {params:?}");
131        let try_consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
132
133        let Some(consumer_uri) = try_consumer_uri else {
134            tracing::error!("failed to create consumer, consumer_uri is not set");
135            return Err(Error::new(InvalidInput, "failed to create consumer, consumer_uri is not set"));
136        };
137
138        tracing::debug!("setting stored consumer_uri: {consumer_uri}");
139        self.set_consumer_uri(&consumer_uri);
140        Ok(consumer_uri.clone())
141    }
142
143
144    /// Attempts to create a new consumer, handling the case where the consumer already exists.
145    ///
146    /// If the consumer already exists, this method will reconstruct the consumer URI based on
147    /// the group and consumer name, and return it wrapped in `Some`. This allows reconnecting
148    /// to existing consumers.
149    ///
150    /// # Arguments
151    ///
152    /// * `group` - The consumer group name
153    /// * `params` - Parameters for creating the consumer (name, format, etc.)
154    ///
155    /// # Returns
156    ///
157    /// * `Ok(Some(String))` - The consumer instance URI (new or reconstructed)
158    /// * `Ok(None)` - Should not occur in current implementation
159    /// * `Err(Error)` - An error if the operation fails
160    ///
161    /// # Examples
162    ///
163    /// ```no_run
164    /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
165    ///
166    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
167    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
168    /// let params = CreateConsumerParams {
169    ///     name: "consumer-1".to_string(),
170    ///     format: "json".to_string(),
171    ///     ..Default::default()
172    /// };
173    /// // This won't fail if consumer already exists
174    /// let consumer_uri = client.try_create_consumer("my-group", &params).await?;
175    /// # Ok(())
176    /// # }
177    /// ```
178    pub async fn try_create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<Option<String>, Error> {
179        tracing::debug!("Creating consumer for group {group} - params: {params:?}");
180        let consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
181        match consumer_uri {
182            Some(new_uri) => {
183                tracing::debug!("setting stored consumer_uri: {new_uri}");
184                self.set_consumer_uri(&new_uri);
185                Ok(Some(new_uri))
186            }
187            None => {
188                tracing::debug!("consumer already exists");
189                let group = group.to_string();
190                let consumer_id = &params.name;
191                let built_consumer_uri = self.rebuild_consumer_uri(&group, consumer_id);
192
193                tracing::debug!("setting stored consumer_uri: {built_consumer_uri}");
194
195                self.set_consumer_uri(&built_consumer_uri);
196                Ok(self.consumer_uri.clone())
197            }
198        }
199    }
200
201    /// Subscribes the consumer to one or more topics.
202    ///
203    /// A consumer must be created before calling this method. The consumer URI must be set
204    /// (either by creating a consumer or manually setting it).
205    ///
206    /// # Arguments
207    ///
208    /// * `group` - The consumer group name (used for logging)
209    /// * `params` - Subscription parameters including topics to subscribe to
210    ///
211    /// # Returns
212    ///
213    /// * `Ok(())` - Subscription was successful
214    /// * `Err(Error)` - An error if consumer URI is not set or subscription fails
215    ///
216    /// # Examples    /// Polls for new records from subscribed topics.
217    ///
218    /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
219    /// created and subscribed to topics before polling.
220    ///
221    /// # Returns
222    ///
223    /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
224    /// * `Err(Error)` - An error if consumer URI is not set or polling fails
225    ///
226    /// # Examples
227    ///
228    /// ```no_run
229    /// use kafka_http_client::KafkaHttpClient;
230    ///
231    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
232    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
233    /// // ... create consumer and subscribe ...
234    /// let records = client.poll().await?;
235    /// for record in records {
236    ///     println!("Received: {:?}", record);
237    /// }
238    /// # Ok(())
239    /// # }
240    /// ```
241    ///
242    /// ```no_run
243    /// use kafka_http::{KafkaHttpClient, types::{CreateConsumerParams, SubscribeParams}};
244    ///
245    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
246    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
247    /// // ... create consumer first ...
248    /// let subscribe_params = SubscribeParams {
249    ///     topics: vec!["my-topic".to_string()],
250    ///     ..Default::default()
251    /// };
252    /// client.subscribe("my-group", &subscribe_params).await?;
253    /// # Ok(())
254    /// # }
255    /// ```
256    pub async fn subscribe(&self, group: &str, params: &SubscribeParams) -> Result<(), Error> {
257        let Some(consumer_uri) = &self.consumer_uri else {
258            tracing::error!("Consumer URI is not set");
259            return Err(Error::new(InvalidInput, "Consumer URI is not set"));
260        };
261        tracing::debug!("Subscribing to {group} - params: {params:?}");
262        subscribe(&self.client, &consumer_uri, params).await
263    }
264
265    /// Polls for new records from subscribed topics.
266    ///
267    /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
268    /// created and subscribed to topics before polling.
269    ///
270    /// # Returns
271    ///
272    /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
273    /// * `Err(Error)` - An error if consumer URI is not set or polling fails
274    ///
275    /// # Examples
276    ///
277    /// ```no_run
278    /// use kafka_http::KafkaHttpClient;
279    ///
280    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
281    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
282    /// // ... create consumer and subscribe ...
283    /// let records = client.poll().awaait?;
284    /// for record in records {
285    ///     println!("Received: {:?}", record);
286    /// }
287    /// # Ok(())
288    /// # }
289    /// ```
290    pub async fn poll(&self) -> Result<Vec<Record>, Error> {
291        let Some(consumer_uri) = &self.consumer_uri else {
292            tracing::error!("Consumer URI is not set");
293            return Err(Error::new(InvalidInput, "Consumer URI is not set"));
294        };
295        tracing::debug!("Polling");
296        poll(&self.client, &consumer_uri, self.timeout_ms).await
297    }
298
299    /// Produces records to a Kafka topic.
300    ///
301    /// # Arguments
302    ///
303    /// * `topic` - The name of the topic to produce to
304    /// * `params` - Parameters containing the records to produce
305    ///
306    /// # Returns
307    ///
308    /// * `Ok(())` - Records were successfully produced
309    /// * `Err(Error)` - An error if production fails
310    ///
311    /// # Examples
312    ///
313    /// ```no_run
314    /// use kafka_http::{KafkaHttpClient, types::ProduceParams};
315    ///
316    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
317    /// let client = KafkaHttpClient::new("http://localhost:8082");
318    /// let params = ProduceParams {
319    ///     records: vec![/* records */],
320    ///     ..Default::default()
321    /// };
322    /// client.produce("my-topic", &params).await?;
323    /// # Ok(())
324    /// # }
325    /// ```
326    pub async fn produce(&self, topic: &str, params: &ProduceParams) -> Result<(), Error> {
327        tracing::debug!("producing records");
328        produce(&self.client, &self.base_uri, topic, params).await
329    }
330
331    /// Commits offsets for consumed messages.
332    ///
333    /// A consumer must be created before calling this method. The consumer URI must be set.
334    ///
335    /// # Arguments
336    ///
337    /// * `params` - Parameters specifying which partition offsets to commit
338    ///
339    /// # Returns
340    ///
341    /// * `Ok(())` - Offsets were successfully committed
342    /// * `Err(Error)` - An error if consumer URI is not set or commit fails
343    ///
344    /// # Examples
345    ///
346    /// ```no_run
347    /// use kafka_http::{KafkaHttpClient, types::PartitionOffsetCommitParams};
348    ///
349    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
350    /// let mut client = KafkaHttpClient::new("http://localhost:8082");
351    /// // ... create consumer, subscribe, and poll ...
352    /// let params = PartitionOffsetCommitParams {
353    ///     partitions: vec![/* partition offsets */],
354    ///     ..Default::default()
355    /// };
356    /// client.commit(&params).await?;
357    /// # Ok(())
358    /// # }
359    /// ```
360    pub async fn commit(&self, params: &PartitionOffsetCommitParams) -> Result<(), Error> {
361        let Some(consumer_uri) = &self.consumer_uri else {
362            tracing::error!("Consumer URI is not set");
363            return Err(Error::new(InvalidInput, "Consumer URI is not set"));
364        };
365        tracing::debug!("commiting offsets");
366        commit(&self.client, &consumer_uri, params).await
367    }
368}