kafka_http/lib.rs
1use reqwest::{header, Client};
2use crate::api::{commit, create_consumer, poll, produce, subscribe};
3use crate::error::Error;
4use crate::error::ErrorType::InvalidInput;
5use crate::types::{CreateConsumerParams, PartitionOffsetCommitParams, ProduceParams, Record, SubscribeParams};
6use base64::{engine::general_purpose, Engine as _};
7
8pub(crate) mod api;
9pub mod error;
10pub mod types;
11
12
13/// A client for interacting with Kafka REST Proxy over HTTP.
14///
15/// This client provides methods for creating consumers, subscribing to topics,
16/// polling for records, producing messages, and committing offsets using the Kafka REST API.
17///
18/// # Examples
19///
20/// ```no_run
21/// use kafka_http::KafkaHttpClient;
22///
23/// let mut client = KafkaHttpClient::new("http://localhost:8082");
24/// client.set_timeout_ms(5000);
25/// ```
26#[derive(Debug, Clone)]
27pub struct KafkaHttpClient {
28 pub(crate) client: Client,
29 pub(crate) base_uri: String,
30 pub(crate) consumer_uri: Option<String>,
31 pub(crate) timeout_ms: u64,
32 pub(crate) target_topic: Option<String>
33}
34
35impl KafkaHttpClient {
36 /// Creates a new `KafkaHttpClient` instance.
37 ///
38 /// # Arguments
39 ///
40 /// * `base_uri` - The base URI of the Kafka REST Proxy (e.g., "http://localhost:8082")
41 ///
42 /// # Returns
43 ///
44 /// A new `KafkaHttpClient` with default timeout of 1000ms
45 ///
46 /// # Examples
47 ///
48 /// ```no_run
49 /// use kafka_http::KafkaHttpClient;
50 ///
51 /// let client = KafkaHttpClient::new("http://localhost:8082");
52 /// ```
53 pub fn new(base_uri: &str) -> Self {
54 Self {
55 client: reqwest::Client::new(),
56 base_uri: base_uri.to_string(),
57 target_topic: None,
58 consumer_uri: None,
59 timeout_ms: 1000,
60 }
61 }
62
63 /// Helper util to rebuild and set the uri if a conflict exists
64 fn rebuild_consumer_uri(&self, group: &str, consumer_id: &str) -> String {
65 // "http://localhost:18082/consumers/demo-group/instances/consumer-1"
66 format!("{}/consumers/{group}/instances/{consumer_id}", self.base_uri)
67 }
68
69
70 /// Sets HTTP Basic Authentication credentials for the client.
71 ///
72 /// This method configures the client to use HTTP Basic Authentication by encoding
73 /// the provided username and password and adding them as a default Authorization
74 /// header to all subsequent requests.
75 ///
76 /// # Arguments
77 ///
78 /// * `username` - The username for Basic Authentication
79 /// * `password` - The password for Basic Authentication
80 ///
81 /// # Returns
82 ///
83 /// * `Ok(())` - Authentication was successfully configured
84 /// * `Err(Error)` - An error if the client could not be rebuilt with the new headers
85 ///
86 /// # Examples
87 ///
88 /// ```no_run
89 /// use kafka_http::KafkaHttpClient;
90 ///
91 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
92 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
93 /// client.set_basic_auth("my-username", "my-password")?;
94 /// // All subsequent requests will include the Basic Auth header
95 /// # Ok(())
96 /// # }
97 /// ```
98 pub fn set_basic_auth(&mut self, username: &str, password: &str) -> Result<(), Error> {
99 // Create a new client with default headers the provided auth params
100 let auth_str = format!("{}:{}", username, password);
101 let encoded = general_purpose::STANDARD.encode(auth_str);
102
103 let mut headers = header::HeaderMap::new();
104 headers.insert(
105 header::AUTHORIZATION,
106 header::HeaderValue::from_str(&format!("Basic {}", encoded)).unwrap(),
107 );
108
109 // Replace the client with one that has the default header
110 let new_client = Client::builder()
111 .default_headers(headers)
112 .build()?;
113
114 self.client = new_client;
115 Ok(())
116 }
117
118 /// Sets the consumer URI for this client.
119 ///
120 /// This is typically called internally after creating a consumer, but can be used
121 /// to manually set a consumer URI if needed.
122 ///
123 /// # Arguments
124 ///
125 /// * `uri` - The consumer instance URI to set
126 pub fn set_consumer_uri(&mut self, uri: &String) {
127 tracing::debug!("Setting consumer URI to {uri}");
128 self.consumer_uri = Some(uri.clone());
129 }
130
131 /// Sets the timeout duration for polling operations.
132 ///
133 /// # Arguments
134 ///
135 /// * `timeout_ms` - The timeout duration in milliseconds
136 ///
137 /// # Examples
138 ///
139 /// ```no_run
140 /// use kafka_http::KafkaHttpClient;
141 ///
142 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
143 /// client.set_timeout_ms(5000); // Set 5 second timeout
144 /// ```
145 pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
146 tracing::debug!("Setting timeout to {timeout_ms} ms");
147 self.timeout_ms = timeout_ms;
148 }
149
150 /// Creates a new consumer in the specified consumer group.
151 ///
152 /// This method will fail if the consumer already exists. Use `try_create_consumer`
153 /// if you want to handle existing consumers gracefully.
154 ///
155 /// # Arguments
156 ///
157 /// * `group` - The consumer group name
158 /// * `params` - Parameters for creating the consumer (name, format, etc.)
159 ///
160 /// # Returns
161 ///
162 /// * `Ok(String)` - The consumer instance URI on success
163 /// * `Err(Error)` - An error if the consumer creation fails or URI is not returned
164 ///
165 /// # Examples
166 ///
167 /// ```no_run
168 /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
169 ///
170 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
171 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
172 /// let params = CreateConsumerParams {
173 /// name: "consumer-1".to_string(),
174 /// format: "json".to_string(),
175 /// ..Default::default()
176 /// };
177 /// let consumer_uri = client.create_consumer("my-group", ¶ms).await?;
178 /// # Ok(())
179 /// # }
180 /// ```
181 pub async fn create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<String, Error> {
182 tracing::debug!("Creating consumer for group {group} - params: {params:?}");
183 let try_consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
184
185 let Some(consumer_uri) = try_consumer_uri else {
186 tracing::error!("failed to create consumer, consumer_uri is not set");
187 return Err(Error::new(InvalidInput, "failed to create consumer, consumer_uri is not set"));
188 };
189
190 tracing::debug!("setting stored consumer_uri: {consumer_uri}");
191 self.set_consumer_uri(&consumer_uri);
192 Ok(consumer_uri.clone())
193 }
194
195 /// Attempts to create a new consumer, handling the case where the consumer already exists.
196 ///
197 /// If the consumer already exists, this method will reconstruct the consumer URI based on
198 /// the group and consumer name, and return it wrapped in `Some`. This allows reconnecting
199 /// to existing consumers.
200 ///
201 /// # Arguments
202 ///
203 /// * `group` - The consumer group name
204 /// * `params` - Parameters for creating the consumer (name, format, etc.)
205 ///
206 /// # Returns
207 ///
208 /// * `Ok(Some(String))` - The consumer instance URI (new or reconstructed)
209 /// * `Ok(None)` - Should not occur in current implementation
210 /// * `Err(Error)` - An error if the operation fails
211 ///
212 /// # Examples
213 ///
214 /// ```no_run
215 /// use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
216 ///
217 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
218 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
219 /// let params = CreateConsumerParams {
220 /// name: "consumer-1".to_string(),
221 /// format: "json".to_string(),
222 /// ..Default::default()
223 /// };
224 /// // This won't fail if consumer already exists
225 /// let consumer_uri = client.try_create_consumer("my-group", ¶ms).await?;
226 /// # Ok(())
227 /// # }
228 /// ```
229 pub async fn try_create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<Option<String>, Error> {
230 tracing::debug!("Creating consumer for group {group} - params: {params:?}");
231 let consumer_uri = create_consumer(&self.client, &self.base_uri, group, params).await?;
232 match consumer_uri {
233 Some(new_uri) => {
234 tracing::debug!("setting stored consumer_uri: {new_uri}");
235 self.set_consumer_uri(&new_uri);
236 Ok(Some(new_uri))
237 }
238 None => {
239 tracing::debug!("consumer already exists");
240 let group = group.to_string();
241 let consumer_id = ¶ms.name;
242 let built_consumer_uri = self.rebuild_consumer_uri(&group, consumer_id);
243
244 tracing::debug!("setting stored consumer_uri: {built_consumer_uri}");
245
246 self.set_consumer_uri(&built_consumer_uri);
247 Ok(self.consumer_uri.clone())
248 }
249 }
250 }
251
252 /// Subscribes the consumer to one or more topics.
253 ///
254 /// A consumer must be created before calling this method. The consumer URI must be set
255 /// (either by creating a consumer or manually setting it).
256 ///
257 /// # Arguments
258 ///
259 /// * `group` - The consumer group name (used for logging)
260 /// * `params` - Subscription parameters including topics to subscribe to
261 ///
262 /// # Returns
263 ///
264 /// * `Ok(())` - Subscription was successful
265 /// * `Err(Error)` - An error if consumer URI is not set or subscription fails
266 ///
267 /// # Examples /// Polls for new records from subscribed topics.
268 ///
269 /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
270 /// created and subscribed to topics before polling.
271 ///
272 /// # Returns
273 ///
274 /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
275 /// * `Err(Error)` - An error if consumer URI is not set or polling fails
276 ///
277 /// # Examples
278 ///
279 /// ```no_run
280 /// use kafka_http::KafkaHttpClient;
281 ///
282 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
283 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
284 /// // ... create consumer and subscribe ...
285 /// let records = client.poll().await?;
286 /// for record in records {
287 /// println!("Received: {:?}", record);
288 /// }
289 /// # Ok(())
290 /// # }
291 /// ```
292 ///
293 /// ```no_run
294 /// use kafka_http::{KafkaHttpClient, types::{CreateConsumerParams, SubscribeParams}};
295 ///
296 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
297 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
298 /// // ... create consumer first ...
299 /// let subscribe_params = SubscribeParams {
300 /// topics: vec!["my-topic".to_string()],
301 /// ..Default::default()
302 /// };
303 /// client.subscribe("my-group", &subscribe_params).await?;
304 /// # Ok(())
305 /// # }
306 /// ```
307 pub async fn subscribe(&self, group: &str, params: &SubscribeParams) -> Result<(), Error> {
308 let Some(consumer_uri) = &self.consumer_uri else {
309 tracing::error!("Consumer URI is not set");
310 return Err(Error::new(InvalidInput, "Consumer URI is not set"));
311 };
312 tracing::debug!("Subscribing to {group} - params: {params:?}");
313 subscribe(&self.client, &consumer_uri, params).await
314 }
315
316 /// Polls for new records from subscribed topics.
317 ///
318 /// This method uses the timeout configured via `set_timeout_ms()`. A consumer must be
319 /// created and subscribed to topics before polling.
320 ///
321 /// # Returns
322 ///
323 /// * `Ok(Vec<Record>)` - A vector of records (may be empty if no records available)
324 /// * `Err(Error)` - An error if consumer URI is not set or polling fails
325 ///
326 /// # Examples
327 ///
328 /// ```no_run
329 /// use kafka_http::KafkaHttpClient;
330 ///
331 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
332 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
333 /// // ... create consumer and subscribe ...
334 /// let records = client.poll().awaait?;
335 /// for record in records {
336 /// println!("Received: {:?}", record);
337 /// }
338 /// # Ok(())
339 /// # }
340 /// ```
341 pub async fn poll(&self) -> Result<Vec<Record>, Error> {
342 let Some(consumer_uri) = &self.consumer_uri else {
343 tracing::error!("Consumer URI is not set");
344 return Err(Error::new(InvalidInput, "Consumer URI is not set"));
345 };
346 tracing::debug!("Polling");
347 poll(&self.client, &consumer_uri, self.timeout_ms).await
348 }
349
350 /// Produces records to a Kafka topic.
351 ///
352 /// # Arguments
353 ///
354 /// * `topic` - The name of the topic to produce to
355 /// * `params` - Parameters containing the records to produce
356 ///
357 /// # Returns
358 ///
359 /// * `Ok(())` - Records were successfully produced
360 /// * `Err(Error)` - An error if production fails
361 ///
362 /// # Examples
363 ///
364 /// ```no_run
365 /// use kafka_http::{KafkaHttpClient, types::ProduceParams};
366 ///
367 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
368 /// let client = KafkaHttpClient::new("http://localhost:8082");
369 /// let params = ProduceParams {
370 /// records: vec![/* records */],
371 /// ..Default::default()
372 /// };
373 /// client.produce_to_topic("my-topic", ¶ms).await?;
374 /// # Ok(())
375 /// # }
376 /// ```
377 pub async fn produce_to_topic(&self, topic: &str, params: &ProduceParams) -> Result<(), Error> {
378 tracing::debug!("producing records");
379 produce(&self.client, &self.base_uri, topic, params).await
380 }
381
382 /// Sets the target topic for subsequent produce operations.
383 ///
384 /// This allows using the `produce()` method without specifying a topic each time.
385 /// The topic will be used for all subsequent calls to `produce()` until changed.
386 ///
387 /// # Arguments
388 ///
389 /// * `topic` - The name of the topic to use for future produce operations
390 ///
391 /// # Examples
392 ///
393 /// ```no_run
394 /// use kafka_http::KafkaHttpClient;
395 ///
396 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
397 /// client.set_target_topic("my-topic");
398 /// // Now can use produce() without specifying topic
399 /// ```
400 pub fn set_target_topic(&mut self, topic: &str) {
401 self.target_topic = Some(topic.to_string());
402 }
403
404 /// Produces records to the previously set target topic.
405 ///
406 /// This is a convenience method that uses the topic set via `set_target_topic()`.
407 /// If no target topic has been set, this method will return an error.
408 ///
409 /// # Arguments
410 ///
411 /// * `params` - Parameters containing the records to produce
412 ///
413 /// # Returns
414 ///
415 /// * `Ok(())` - Records were successfully produced
416 /// * `Err(Error)` - An error if target topic is not set or production fails
417 ///
418 /// # Examples
419 ///
420 /// ```no_run
421 /// use kafka_http::{KafkaHttpClient, types::ProduceParams};
422 ///
423 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
424 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
425 /// client.set_target_topic("my-topic");
426 ///
427 /// let params = ProduceParams {
428 /// records: vec![/* records */],
429 /// ..Default::default()
430 /// };
431 /// client.produce(¶ms).await?;
432 /// # Ok(())
433 /// # }
434 /// ```
435 pub async fn produce(&self, params: &ProduceParams) -> Result<(), Error> {
436 let Some(topic) = &self.target_topic else {
437 tracing::error!("Target topic is not set");
438 return Err(Error::new(InvalidInput, "Target topic is not set"));
439 };
440 tracing::debug!("producing records to topic: {topic}");
441 produce(&self.client, &self.base_uri, topic, params).await
442 }
443 /// Commits offsets for consumed messages.
444 ///
445 /// A consumer must be created before calling this method. The consumer URI must be set.
446 ///
447 /// # Arguments
448 ///
449 /// * `params` - Parameters specifying which partition offsets to commit
450 ///
451 /// # Returns
452 ///
453 /// * `Ok(())` - Offsets were successfully committed
454 /// * `Err(Error)` - An error if consumer URI is not set or commit fails
455 ///
456 /// # Examples
457 ///
458 /// ```no_run
459 /// use kafka_http::{KafkaHttpClient, types::PartitionOffsetCommitParams};
460 ///
461 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
462 /// let mut client = KafkaHttpClient::new("http://localhost:8082");
463 /// // ... create consumer, subscribe, and poll ...
464 /// let params = PartitionOffsetCommitParams {
465 /// partitions: vec![/* partition offsets */],
466 /// ..Default::default()
467 /// };
468 /// client.commit(¶ms).await?;
469 /// # Ok(())
470 /// # }
471 /// ```
472 pub async fn commit(&self, params: &PartitionOffsetCommitParams) -> Result<(), Error> {
473 let Some(consumer_uri) = &self.consumer_uri else {
474 tracing::error!("Consumer URI is not set");
475 return Err(Error::new(InvalidInput, "Consumer URI is not set"));
476 };
477 tracing::debug!("commiting offsets");
478 commit(&self.client, &consumer_uri, params).await
479 }
480}