kafka_http/lib.rs
1use reqwest::Client;
2use crate::api::{commit, create_consumer, poll, produce, subscribe};
3use crate::error::Error;
4use crate::error::ErrorType::InvalidInput;
5use crate::types::{CreateConsumerParams, PartitionOffsetCommitParams, ProduceParams, Record, SubscribeParams};
6
7pub(crate) mod api;
8pub mod error;
9pub mod types;
10
11
12/// A client for interacting with Kafka REST Proxy over HTTP.
13///
14/// This client provides methods for creating consumers, subscribing to topics,
15/// polling for records, producing messages, and committing offsets using the Kafka REST API.
16///
17/// # Examples
18///
19/// ```no_run
20/// use kafka_http::KafkaHttpClient;
21///
22/// let mut client = KafkaHttpClient::new("http://localhost:8082");
23/// client.set_timeout_ms(5000);
24/// ```
25#[derive(Debug, Clone)]
26pub struct KafkaHttpClient {
27 pub(crate) client: Client,
28 pub(crate) base_uri: String,
29 pub(crate) consumer_uri: Option<String>,
30 pub(crate) timeout_ms: u64,
31}
32
33impl KafkaHttpClient {
34 /// Creates a new `KafkaHttpClient` instance.
35 ///
36 /// # Arguments
37 ///
38 /// * `base_uri` - The base URI of the Kafka REST Proxy (e.g., "http://localhost:8082")
39 ///
40 /// # Returns
41 ///
42 /// A new `KafkaHttpClient` with default timeout of 1000ms
43 ///
44 /// # Examples
45 ///
46 /// ```no_run
47 /// use kafka_http::KafkaHttpClient;
48 ///
49 /// let client = KafkaHttpClient::new("http://localhost:8082");
50 /// ```
51 pub fn new(base_uri: &str) -> Self {
52 Self {
53 client: reqwest::Client::new(),
54 base_uri: base_uri.to_string(),
55 consumer_uri: None,
56 timeout_ms: 1000,
57 }
58 }
59
60 /// Helper util to rebuild and set the uri if a conflict exists
61 fn rebuild_consumer_uri(&self, group: &str, consumer_id: &str) -> String {
62 // "http://localhost:18082/consumers/demo-group/instances/consumer-1"
63 format!("{}/consumers/{group}/instances/{consumer_id}", self.base_uri)
64 }
65
66 /// Sets the consumer URI for this client.
67 ///
68 /// This is typically called internally after creating a consumer, but can be used
69 /// to manually set a consumer URI if needed.
70 ///
71 /// # Arguments
72 ///
73 /// * `uri` - The consumer instance URI to set
74 pub fn set_consumer_uri(&mut self, uri: &String) {
75 tracing::debug!("Setting consumer URI to {uri}");
76 self.consumer_uri = Some(uri.clone());
77 }
78
79 /// Sets the timeout duration for polling operations.
80 ///
81 /// # Arguments
82 ///
83 /// * `timeout_ms` - The timeout duration in milliseconds
84 ///
85 /// # Examples
86 ///
87 /// ```no_run
88 /// use kafka_http::KafkaHttpClient;
89 ///
90 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
91 /// client.set_timeout_ms(5000); // Set 5 second timeout
92 /// ```
93 pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
94 tracing::debug!("Setting timeout to {timeout_ms} ms");
95 self.timeout_ms = timeout_ms;
96 }
97
98 /// Creates a new consumer in the specified consumer group.
99 ///
100 /// This method will fail if the consumer already exists. Use `try_create_consumer`
101 /// if you want to handle existing consumers gracefully.
102 ///
103 /// # Arguments
104 ///
105 /// * `group` - The consumer group name
106 /// * `params` - Parameters for creating the consumer (name, format, etc.)
107 ///
108 /// # Returns
109 ///
110 /// * `Ok(String)` - The consumer instance URI on success
111 /// * `Err(Error)` - An error if the consumer creation fails or URI is not returned
112 ///
113 /// # Examples
114 ///
115 /// ```no_run
116 /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
117 ///
118 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
119 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
120 /// let params = CreateConsumerParams {
121 /// name: "consumer-1".to_string(),
122 /// format: "json".to_string(),
123 /// ..Default::default()
124 /// };
125 /// let consumer_uri = client.create_consumer("my-group", ¶ms).await?;
126 /// # Ok(())
127 /// # }
128 /// ```
129 pub async fn create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<String, Error> {
130 tracing::debug!("Creating consumer for group {group} - params: {params:?}");
131 let try_consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
132
133 let Some(consumer_uri) = try_consumer_uri else {
134 tracing::error!("failed to create consumer, consumer_uri is not set");
135 return Err(Error::new(InvalidInput, "failed to create consumer, consumer_uri is not set"));
136 };
137
138 tracing::debug!("setting stored consumer_uri: {consumer_uri}");
139 self.set_consumer_uri(&consumer_uri);
140 Ok(consumer_uri.clone())
141 }
142
143
144 /// Attempts to create a new consumer, handling the case where the consumer already exists.
145 ///
146 /// If the consumer already exists, this method will reconstruct the consumer URI based on
147 /// the group and consumer name, and return it wrapped in `Some`. This allows reconnecting
148 /// to existing consumers.
149 ///
150 /// # Arguments
151 ///
152 /// * `group` - The consumer group name
153 /// * `params` - Parameters for creating the consumer (name, format, etc.)
154 ///
155 /// # Returns
156 ///
157 /// * `Ok(Some(String))` - The consumer instance URI (new or reconstructed)
158 /// * `Ok(None)` - Should not occur in current implementation
159 /// * `Err(Error)` - An error if the operation fails
160 ///
161 /// # Examples
162 ///
163 /// ```no_run
164 /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
165 ///
166 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
167 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
168 /// let params = CreateConsumerParams {
169 /// name: "consumer-1".to_string(),
170 /// format: "json".to_string(),
171 /// ..Default::default()
172 /// };
173 /// // This won't fail if consumer already exists
174 /// let consumer_uri = client.try_create_consumer("my-group", ¶ms).await?;
175 /// # Ok(())
176 /// # }
177 /// ```
178 pub async fn try_create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<Option<String>, Error> {
179 tracing::debug!("Creating consumer for group {group} - params: {params:?}");
180 let consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
181 match consumer_uri {
182 Some(new_uri) => {
183 tracing::debug!("setting stored consumer_uri: {new_uri}");
184 self.set_consumer_uri(&new_uri);
185 Ok(Some(new_uri))
186 }
187 None => {
188 tracing::debug!("consumer already exists");
189 let group = group.to_string();
190 let consumer_id = ¶ms.name;
191 let built_consumer_uri = self.rebuild_consumer_uri(&group, consumer_id);
192
193 tracing::debug!("setting stored consumer_uri: {built_consumer_uri}");
194
195 self.set_consumer_uri(&built_consumer_uri);
196 Ok(self.consumer_uri.clone())
197 }
198 }
199 }
200
201 /// Subscribes the consumer to one or more topics.
202 ///
203 /// A consumer must be created before calling this method. The consumer URI must be set
204 /// (either by creating a consumer or manually setting it).
205 ///
206 /// # Arguments
207 ///
208 /// * `group` - The consumer group name (used for logging)
209 /// * `params` - Subscription parameters including topics to subscribe to
210 ///
211 /// # Returns
212 ///
213 /// * `Ok(())` - Subscription was successful
214 /// * `Err(Error)` - An error if consumer URI is not set or subscription fails
215 ///
216 /// # Examples /// Polls for new records from subscribed topics.
217 ///
218 /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
219 /// created and subscribed to topics before polling.
220 ///
221 /// # Returns
222 ///
223 /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
224 /// * `Err(Error)` - An error if consumer URI is not set or polling fails
225 ///
226 /// # Examples
227 ///
228 /// ```no_run
229 /// use kafka_http_client::KafkaHttpClient;
230 ///
231 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
232 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
233 /// // ... create consumer and subscribe ...
234 /// let records = client.poll().await?;
235 /// for record in records {
236 /// println!("Received: {:?}", record);
237 /// }
238 /// # Ok(())
239 /// # }
240 /// ```
241 ///
242 /// ```no_run
243 /// use kafka_http::{KafkaHttpClient, types::{CreateConsumerParams, SubscribeParams}};
244 ///
245 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
246 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
247 /// // ... create consumer first ...
248 /// let subscribe_params = SubscribeParams {
249 /// topics: vec!["my-topic".to_string()],
250 /// ..Default::default()
251 /// };
252 /// client.subscribe("my-group", &subscribe_params).await?;
253 /// # Ok(())
254 /// # }
255 /// ```
256 pub async fn subscribe(&self, group: &str, params: &SubscribeParams) -> Result<(), Error> {
257 let Some(consumer_uri) = &self.consumer_uri else {
258 tracing::error!("Consumer URI is not set");
259 return Err(Error::new(InvalidInput, "Consumer URI is not set"));
260 };
261 tracing::debug!("Subscribing to {group} - params: {params:?}");
262 subscribe(&self.client, &consumer_uri, params).await
263 }
264
265 /// Polls for new records from subscribed topics.
266 ///
267 /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
268 /// created and subscribed to topics before polling.
269 ///
270 /// # Returns
271 ///
272 /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
273 /// * `Err(Error)` - An error if consumer URI is not set or polling fails
274 ///
275 /// # Examples
276 ///
277 /// ```no_run
278 /// use kafka_http::KafkaHttpClient;
279 ///
280 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
281 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
282 /// // ... create consumer and subscribe ...
283 /// let records = client.poll().awaait?;
284 /// for record in records {
285 /// println!("Received: {:?}", record);
286 /// }
287 /// # Ok(())
288 /// # }
289 /// ```
290 pub async fn poll(&self) -> Result<Vec<Record>, Error> {
291 let Some(consumer_uri) = &self.consumer_uri else {
292 tracing::error!("Consumer URI is not set");
293 return Err(Error::new(InvalidInput, "Consumer URI is not set"));
294 };
295 tracing::debug!("Polling");
296 poll(&self.client, &consumer_uri, self.timeout_ms).await
297 }
298
299 /// Produces records to a Kafka topic.
300 ///
301 /// # Arguments
302 ///
303 /// * `topic` - The name of the topic to produce to
304 /// * `params` - Parameters containing the records to produce
305 ///
306 /// # Returns
307 ///
308 /// * `Ok(())` - Records were successfully produced
309 /// * `Err(Error)` - An error if production fails
310 ///
311 /// # Examples
312 ///
313 /// ```no_run
314 /// use kafka_http::{KafkaHttpClient, types::ProduceParams};
315 ///
316 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
317 /// let client = KafkaHttpClient::new("http://localhost:8082");
318 /// let params = ProduceParams {
319 /// records: vec![/* records */],
320 /// ..Default::default()
321 /// };
322 /// client.produce("my-topic", ¶ms).await?;
323 /// # Ok(())
324 /// # }
325 /// ```
326 pub async fn produce(&self, topic: &str, params: &ProduceParams) -> Result<(), Error> {
327 tracing::debug!("producing records");
328 produce(&self.client, &self.base_uri, topic, params).await
329 }
330
331 /// Commits offsets for consumed messages.
332 ///
333 /// A consumer must be created before calling this method. The consumer URI must be set.
334 ///
335 /// # Arguments
336 ///
337 /// * `params` - Parameters specifying which partition offsets to commit
338 ///
339 /// # Returns
340 ///
341 /// * `Ok(())` - Offsets were successfully committed
342 /// * `Err(Error)` - An error if consumer URI is not set or commit fails
343 ///
344 /// # Examples
345 ///
346 /// ```no_run
347 /// use kafka_http::{KafkaHttpClient, types::PartitionOffsetCommitParams};
348 ///
349 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
350 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
351 /// // ... create consumer, subscribe, and poll ...
352 /// let params = PartitionOffsetCommitParams {
353 /// partitions: vec![/* partition offsets */],
354 /// ..Default::default()
355 /// };
356 /// client.commit(¶ms).await?;
357 /// # Ok(())
358 /// # }
359 /// ```
360 pub async fn commit(&self, params: &PartitionOffsetCommitParams) -> Result<(), Error> {
361 let Some(consumer_uri) = &self.consumer_uri else {
362 tracing::error!("Consumer URI is not set");
363 return Err(Error::new(InvalidInput, "Consumer URI is not set"));
364 };
365 tracing::debug!("commiting offsets");
366 commit(&self.client, &consumer_uri, params).await
367 }
368}