kafka_http/lib.rs
1use reqwest::Client;
2use crate::api::{commit, create_consumer, poll, produce, subscribe};
3use crate::error::Error;
4use crate::error::ErrorType::InvalidInput;
5use crate::types::{CreateConsumerParams, PartitionOffsetCommitParams, ProduceParams, Record, SubscribeParams};
6
7pub(crate) mod api;
8pub mod error;
9pub mod types;
10
11
12/// A client for interacting with Kafka REST Proxy over HTTP.
13///
14/// This client provides methods for creating consumers, subscribing to topics,
15/// polling for records, producing messages, and committing offsets using the Kafka REST API.
16///
17/// # Examples
18///
19/// ```no_run
20/// use kafka_http::KafkaHttpClient;
21///
22/// let mut client = KafkaHttpClient::new("http://localhost:8082");
23/// client.set_timeout_ms(5000);
24/// ```
25#[derive(Debug, Clone)]
26pub struct KafkaHttpClient {
27 pub(crate) client: Client,
28 pub(crate) base_uri: String,
29 pub(crate) consumer_uri: Option<String>,
30 pub(crate) timeout_ms: u64,
31 pub(crate) target_topic: Option<String>
32}
33
34impl KafkaHttpClient {
35 /// Creates a new `KafkaHttpClient` instance.
36 ///
37 /// # Arguments
38 ///
39 /// * `base_uri` - The base URI of the Kafka REST Proxy (e.g., "http://localhost:8082")
40 ///
41 /// # Returns
42 ///
43 /// A new `KafkaHttpClient` with default timeout of 1000ms
44 ///
45 /// # Examples
46 ///
47 /// ```no_run
48 /// use kafka_http::KafkaHttpClient;
49 ///
50 /// let client = KafkaHttpClient::new("http://localhost:8082");
51 /// ```
52 pub fn new(base_uri: &str) -> Self {
53 Self {
54 client: reqwest::Client::new(),
55 base_uri: base_uri.to_string(),
56 target_topic: None,
57 consumer_uri: None,
58 timeout_ms: 1000,
59 }
60 }
61
62 /// Helper util to rebuild and set the uri if a conflict exists
63 fn rebuild_consumer_uri(&self, group: &str, consumer_id: &str) -> String {
64 // "http://localhost:18082/consumers/demo-group/instances/consumer-1"
65 format!("{}/consumers/{group}/instances/{consumer_id}", self.base_uri)
66 }
67
68 /// Sets the consumer URI for this client.
69 ///
70 /// This is typically called internally after creating a consumer, but can be used
71 /// to manually set a consumer URI if needed.
72 ///
73 /// # Arguments
74 ///
75 /// * `uri` - The consumer instance URI to set
76 pub fn set_consumer_uri(&mut self, uri: &String) {
77 tracing::debug!("Setting consumer URI to {uri}");
78 self.consumer_uri = Some(uri.clone());
79 }
80
81 /// Sets the timeout duration for polling operations.
82 ///
83 /// # Arguments
84 ///
85 /// * `timeout_ms` - The timeout duration in milliseconds
86 ///
87 /// # Examples
88 ///
89 /// ```no_run
90 /// use kafka_http::KafkaHttpClient;
91 ///
92 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
93 /// client.set_timeout_ms(5000); // Set 5 second timeout
94 /// ```
95 pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
96 tracing::debug!("Setting timeout to {timeout_ms} ms");
97 self.timeout_ms = timeout_ms;
98 }
99
100 /// Creates a new consumer in the specified consumer group.
101 ///
102 /// This method will fail if the consumer already exists. Use `try_create_consumer`
103 /// if you want to handle existing consumers gracefully.
104 ///
105 /// # Arguments
106 ///
107 /// * `group` - The consumer group name
108 /// * `params` - Parameters for creating the consumer (name, format, etc.)
109 ///
110 /// # Returns
111 ///
112 /// * `Ok(String)` - The consumer instance URI on success
113 /// * `Err(Error)` - An error if the consumer creation fails or URI is not returned
114 ///
115 /// # Examples
116 ///
117 /// ```no_run
118 /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
119 ///
120 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
121 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
122 /// let params = CreateConsumerParams {
123 /// name: "consumer-1".to_string(),
124 /// format: "json".to_string(),
125 /// ..Default::default()
126 /// };
127 /// let consumer_uri = client.create_consumer("my-group", ¶ms).await?;
128 /// # Ok(())
129 /// # }
130 /// ```
131 pub async fn create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<String, Error> {
132 tracing::debug!("Creating consumer for group {group} - params: {params:?}");
133 let try_consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
134
135 let Some(consumer_uri) = try_consumer_uri else {
136 tracing::error!("failed to create consumer, consumer_uri is not set");
137 return Err(Error::new(InvalidInput, "failed to create consumer, consumer_uri is not set"));
138 };
139
140 tracing::debug!("setting stored consumer_uri: {consumer_uri}");
141 self.set_consumer_uri(&consumer_uri);
142 Ok(consumer_uri.clone())
143 }
144
145 /// Attempts to create a new consumer, handling the case where the consumer already exists.
146 ///
147 /// If the consumer already exists, this method will reconstruct the consumer URI based on
148 /// the group and consumer name, and return it wrapped in `Some`. This allows reconnecting
149 /// to existing consumers.
150 ///
151 /// # Arguments
152 ///
153 /// * `group` - The consumer group name
154 /// * `params` - Parameters for creating the consumer (name, format, etc.)
155 ///
156 /// # Returns
157 ///
158 /// * `Ok(Some(String))` - The consumer instance URI (new or reconstructed)
159 /// * `Ok(None)` - Should not occur in current implementation
160 /// * `Err(Error)` - An error if the operation fails
161 ///
162 /// # Examples
163 ///
164 /// ```no_run
165 /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
166 ///
167 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
168 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
169 /// let params = CreateConsumerParams {
170 /// name: "consumer-1".to_string(),
171 /// format: "json".to_string(),
172 /// ..Default::default()
173 /// };
174 /// // This won't fail if consumer already exists
175 /// let consumer_uri = client.try_create_consumer("my-group", ¶ms).await?;
176 /// # Ok(())
177 /// # }
178 /// ```
179 pub async fn try_create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<Option<String>, Error> {
180 tracing::debug!("Creating consumer for group {group} - params: {params:?}");
181 let consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
182 match consumer_uri {
183 Some(new_uri) => {
184 tracing::debug!("setting stored consumer_uri: {new_uri}");
185 self.set_consumer_uri(&new_uri);
186 Ok(Some(new_uri))
187 }
188 None => {
189 tracing::debug!("consumer already exists");
190 let group = group.to_string();
191 let consumer_id = ¶ms.name;
192 let built_consumer_uri = self.rebuild_consumer_uri(&group, consumer_id);
193
194 tracing::debug!("setting stored consumer_uri: {built_consumer_uri}");
195
196 self.set_consumer_uri(&built_consumer_uri);
197 Ok(self.consumer_uri.clone())
198 }
199 }
200 }
201
202 /// Subscribes the consumer to one or more topics.
203 ///
204 /// A consumer must be created before calling this method. The consumer URI must be set
205 /// (either by creating a consumer or manually setting it).
206 ///
207 /// # Arguments
208 ///
209 /// * `group` - The consumer group name (used for logging)
210 /// * `params` - Subscription parameters including topics to subscribe to
211 ///
212 /// # Returns
213 ///
214 /// * `Ok(())` - Subscription was successful
215 /// * `Err(Error)` - An error if consumer URI is not set or subscription fails
216 ///
217 /// # Examples /// Polls for new records from subscribed topics.
218 ///
219 /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
220 /// created and subscribed to topics before polling.
221 ///
222 /// # Returns
223 ///
224 /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
225 /// * `Err(Error)` - An error if consumer URI is not set or polling fails
226 ///
227 /// # Examples
228 ///
229 /// ```no_run
230 /// use kafka_http::KafkaHttpClient;
231 ///
232 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
233 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
234 /// // ... create consumer and subscribe ...
235 /// let records = client.poll().await?;
236 /// for record in records {
237 /// println!("Received: {:?}", record);
238 /// }
239 /// # Ok(())
240 /// # }
241 /// ```
242 ///
243 /// ```no_run
244 /// use kafka_http::{KafkaHttpClient, types::{CreateConsumerParams, SubscribeParams}};
245 ///
246 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
247 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
248 /// // ... create consumer first ...
249 /// let subscribe_params = SubscribeParams {
250 /// topics: vec!["my-topic".to_string()],
251 /// ..Default::default()
252 /// };
253 /// client.subscribe("my-group", &subscribe_params).await?;
254 /// # Ok(())
255 /// # }
256 /// ```
257 pub async fn subscribe(&self, group: &str, params: &SubscribeParams) -> Result<(), Error> {
258 let Some(consumer_uri) = &self.consumer_uri else {
259 tracing::error!("Consumer URI is not set");
260 return Err(Error::new(InvalidInput, "Consumer URI is not set"));
261 };
262 tracing::debug!("Subscribing to {group} - params: {params:?}");
263 subscribe(&self.client, &consumer_uri, params).await
264 }
265
266 /// Polls for new records from subscribed topics.
267 ///
268 /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
269 /// created and subscribed to topics before polling.
270 ///
271 /// # Returns
272 ///
273 /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
274 /// * `Err(Error)` - An error if consumer URI is not set or polling fails
275 ///
276 /// # Examples
277 ///
278 /// ```no_run
279 /// use kafka_http::KafkaHttpClient;
280 ///
281 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
282 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
283 /// // ... create consumer and subscribe ...
284 /// let records = client.poll().awaait?;
285 /// for record in records {
286 /// println!("Received: {:?}", record);
287 /// }
288 /// # Ok(())
289 /// # }
290 /// ```
291 pub async fn poll(&self) -> Result<Vec<Record>, Error> {
292 let Some(consumer_uri) = &self.consumer_uri else {
293 tracing::error!("Consumer URI is not set");
294 return Err(Error::new(InvalidInput, "Consumer URI is not set"));
295 };
296 tracing::debug!("Polling");
297 poll(&self.client, &consumer_uri, self.timeout_ms).await
298 }
299
300 /// Produces records to a Kafka topic.
301 ///
302 /// # Arguments
303 ///
304 /// * `topic` - The name of the topic to produce to
305 /// * `params` - Parameters containing the records to produce
306 ///
307 /// # Returns
308 ///
309 /// * `Ok(())` - Records were successfully produced
310 /// * `Err(Error)` - An error if production fails
311 ///
312 /// # Examples
313 ///
314 /// ```no_run
315 /// use kafka_http::{KafkaHttpClient, types::ProduceParams};
316 ///
317 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
318 /// let client = KafkaHttpClient::new("http://localhost:8082");
319 /// let params = ProduceParams {
320 /// records: vec![/* records */],
321 /// ..Default::default()
322 /// };
323 /// client.produce_to_topic("my-topic", ¶ms).await?;
324 /// # Ok(())
325 /// # }
326 /// ```
327 pub async fn produce_to_topic(&self, topic: &str, params: &ProduceParams) -> Result<(), Error> {
328 tracing::debug!("producing records");
329 produce(&self.client, &self.base_uri, topic, params).await
330 }
331
332 /// Sets the target topic for subsequent produce operations.
333 ///
334 /// This allows using the `produce()` method without specifying a topic each time.
335 /// The topic will be used for all subsequent calls to `produce()` until changed.
336 ///
337 /// # Arguments
338 ///
339 /// * `topic` - The name of the topic to use for future produce operations
340 ///
341 /// # Examples
342 ///
343 /// ```no_run
344 /// use kafka_http::KafkaHttpClient;
345 ///
346 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
347 /// client.set_target_topic("my-topic");
348 /// // Now can use produce() without specifying topic
349 /// ```
350 pub fn set_target_topic(&mut self, topic: &str) {
351 self.target_topic = Some(topic.to_string());
352 }
353
354 /// Produces records to the previously set target topic.
355 ///
356 /// This is a convenience method that uses the topic set via `set_target_topic()`.
357 /// If no target topic has been set, this method will return an error.
358 ///
359 /// # Arguments
360 ///
361 /// * `params` - Parameters containing the records to produce
362 ///
363 /// # Returns
364 ///
365 /// * `Ok(())` - Records were successfully produced
366 /// * `Err(Error)` - An error if target topic is not set or production fails
367 ///
368 /// # Examples
369 ///
370 /// ```no_run
371 /// use kafka_http::{KafkaHttpClient, types::ProduceParams};
372 ///
373 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
374 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
375 /// client.set_target_topic("my-topic");
376 ///
377 /// let params = ProduceParams {
378 /// records: vec![/* records */],
379 /// ..Default::default()
380 /// };
381 /// client.produce(¶ms).await?;
382 /// # Ok(())
383 /// # }
384 /// ```
385 pub async fn produce(&self, params: &ProduceParams) -> Result<(), Error> {
386 let Some(topic) = &self.target_topic else {
387 tracing::error!("Target topic is not set");
388 return Err(Error::new(InvalidInput, "Target topic is not set"));
389 };
390 tracing::debug!("producing records to topic: {topic}");
391 produce(&self.client, &self.base_uri, topic, params).await
392 }
393 /// Commits offsets for consumed messages.
394 ///
395 /// A consumer must be created before calling this method. The consumer URI must be set.
396 ///
397 /// # Arguments
398 ///
399 /// * `params` - Parameters specifying which partition offsets to commit
400 ///
401 /// # Returns
402 ///
403 /// * `Ok(())` - Offsets were successfully committed
404 /// * `Err(Error)` - An error if consumer URI is not set or commit fails
405 ///
406 /// # Examples
407 ///
408 /// ```no_run
409 /// use kafka_http::{KafkaHttpClient, types::PartitionOffsetCommitParams};
410 ///
411 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
412 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
413 /// // ... create consumer, subscribe, and poll ...
414 /// let params = PartitionOffsetCommitParams {
415 /// partitions: vec![/* partition offsets */],
416 /// ..Default::default()
417 /// };
418 /// client.commit(¶ms).await?;
419 /// # Ok(())
420 /// # }
421 /// ```
422 pub async fn commit(&self, params: &PartitionOffsetCommitParams) -> Result<(), Error> {
423 let Some(consumer_uri) = &self.consumer_uri else {
424 tracing::error!("Consumer URI is not set");
425 return Err(Error::new(InvalidInput, "Consumer URI is not set"));
426 };
427 tracing::debug!("commiting offsets");
428 commit(&self.client, &consumer_uri, params).await
429 }
430}