high_level_kafka/consumer.rs
1use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
2
3use futures::Future;
4use log::{debug, error};
5use rdkafka::{
6 consumer::{Consumer as KafkaConsumer, StreamConsumer},
7 error::KafkaError,
8 message::{Headers, OwnedMessage},
9 ClientConfig, Message as KafkaMessage,
10};
11use tokio::sync::Mutex;
12
13use crate::{KafkaError as Error, KafkaResult, Metadata};
14
15///
16/// A Consumer that can be used to consume messages from kafka and has the ability to pause and resume
17///
18pub struct PausableConsumer<T, F>
19where
20 T: for<'b> serde::Deserialize<'b>,
21 F: Future<Output = ()> + Send + Sync + 'static,
22{
23 consumer: rdkafka::consumer::StreamConsumer,
24 topics_map: HashMap<String, Box<dyn Fn(T, Metadata) -> F>>,
25 is_runnig: Arc<Mutex<bool>>,
26}
27
28///
29/// A Consumer that can be used to consume messages from kafka
30///
31pub struct Consumer<T, F>
32where
33 T: for<'b> serde::Deserialize<'b>,
34 F: Future<Output = ()> + Send + Sync + 'static,
35{
36 consumer: rdkafka::consumer::StreamConsumer,
37 topics_map: HashMap<String, Box<dyn Fn(T, Metadata) -> F>>,
38}
39
40///
41/// Configuration options for Consumers
42///
43pub struct ConsumerOptiopns<'a> {
44 bootstrap_servers: String,
45 group_id: String,
46 session_timeout_ms: String,
47 enable_auto_commit: bool,
48 enable_partition_eof: bool,
49 other_options: HashMap<&'a str, &'a str>,
50}
51
52impl<'a> ConsumerOptiopns<'a> {
53 ///
54 /// Creates a new ConsumerOptiopns
55 /// # Arguments
56 /// * `bootstrap_servers` - Comma separated bootstrap servers
57 /// * `group_id` - The group_id of the consumer
58 /// * `session_timeout_ms` - The session timeout in milliseconds
59 /// * `enable_auto_commit` - Enable auto commit
60 /// * `enable_partition_eof` - Enable partition eof
61 ///
62 /// # Example
63 /// ```
64 /// use simple_kafka::ConsumerOptiopns;
65 ///
66 /// let consumer_options = ConsumerOptiopns::from("localhost:9092".to_string(), "group_id".to_string(), "5000".to_string(), true, true, HashMap::new());
67 /// ```
68 pub fn from(
69 bootstrap_servers: String,
70 group_id: String,
71 session_timeout_ms: String,
72 enable_auto_commit: bool,
73 enable_partition_eof: bool,
74 other_options: HashMap<&'a str, &'a str>,
75 ) -> Self {
76 ConsumerOptiopns {
77 bootstrap_servers,
78 group_id,
79 session_timeout_ms,
80 enable_auto_commit,
81 enable_partition_eof,
82 other_options,
83 }
84 }
85}
86
87impl<T, F> PausableConsumer<T, F>
88where
89 T: for<'a> serde::Deserialize<'a>,
90 F: Future<Output = ()> + Send + Sync + 'static,
91{
92 ///
93 /// Creates a new PausableConsumer from a group_id and bootstrap_servers
94 /// # Arguments
95 /// * `group_id` - The group_id of the consumer
96 /// * `bootstrap_servers` - The comma separated bootstrap servers
97 ///
98 /// # Returns
99 /// A tuple of PausableConsumer and a Arc<Mutex<bool>>. The Arc<Mutex<bool>> is used to pause and resume the consumer
100 ///
101 /// # Example
102 /// ```
103 /// use simple_kafka::{PausableConsumer};
104 ///
105 /// let (consumer, paused) = PausableConsumer::from("group_id", "localhost:9092").unwrap();
106 /// ```
107 pub fn from(
108 group_id: &str,
109 bootstrap_servers: &str,
110 ) -> Result<(Self, Arc<Mutex<bool>>), Error> {
111 let consumer = ClientConfig::new()
112 .set("group.id", group_id)
113 .set("bootstrap.servers", bootstrap_servers)
114 .set("enable.partition.eof", "false")
115 .set("session.timeout.ms", "6000")
116 .set("enable.auto.commit", "true")
117 .create::<StreamConsumer>();
118
119 if let Err(error) = consumer {
120 return Err(Error::Kafka(error));
121 }
122
123 let is_runnig = Arc::new(Mutex::new(true));
124 let consumer = consumer.unwrap();
125 let pausable_consumer = PausableConsumer {
126 consumer,
127 topics_map: HashMap::new(),
128 is_runnig: is_runnig.clone(),
129 };
130
131 Ok((pausable_consumer, is_runnig))
132 }
133
134 ///
135 /// Creates a new PausedConsumer from consumer options
136 /// # Arguments
137 /// * `options` - A ConsumerOptions struct that holds the consumer options
138 ///
139 /// # Returns
140 /// A tuple of PausableConsumer and a Arc<Mutex<bool>>. The Arc<Mutex<bool>> is used to pause and resume the consumer
141 ///
142 /// # Example
143 /// ```
144 /// use simple_kafka::{PausableConsumer, ConsumerOptiopns};
145 ///
146 /// let options = ConsumerOptiopns {
147 /// bootstrap_servers: "localhost:9092".to_string(),
148 /// group_id: "group_id".to_string(),
149 /// session_timeout_ms: "6000".to_string(),
150 /// enable_auto_commit: true,
151 /// enable_partition_eof: false,
152 /// };
153 /// let consumer = PausableConsumer::with_options(options)?;
154 /// ```
155 pub fn with_options(options: ConsumerOptiopns) -> Result<(Self, Arc<Mutex<bool>>), Error> {
156 let enable_partition_eof = match options.enable_partition_eof {
157 true => "true",
158 false => "false",
159 };
160
161 let enable_auto_commit = match options.enable_auto_commit {
162 true => "true",
163 false => "false",
164 };
165 let mut binding = ClientConfig::new();
166 let consumer = binding
167 .set("group.id", options.group_id.as_str())
168 .set("bootstrap.servers", options.bootstrap_servers.as_str())
169 .set("enable.partition.eof", enable_partition_eof)
170 .set("session.timeout.ms", options.session_timeout_ms.as_str())
171 .set("enable.auto.commit", enable_auto_commit);
172
173 options.other_options.iter().for_each(|(key, value)| {
174 consumer.set(*key, *value);
175 });
176
177 let consumer = consumer.create::<StreamConsumer>();
178 if let Err(error) = consumer {
179 return Err(Error::Kafka(error));
180 }
181
182 let is_runnig = Arc::new(Mutex::new(true));
183 let consumer = consumer.unwrap();
184 let pausable_consumer = PausableConsumer {
185 consumer,
186 topics_map: HashMap::new(),
187 is_runnig: is_runnig.clone(),
188 };
189 Ok((pausable_consumer, is_runnig))
190 }
191
192 /// FIXME: Not ready yet
193 /// Add message handles to the cosnumer
194 /// Currently only support one handler.
195 ///
196 /// # Arguments
197 /// * `topic` - The topic to subscribe to
198 /// * `handler` - The handler function that will be called for each message for the give `topic`
199 ///
200 /// # Example
201 /// ```
202 /// use simple_kafka::{PausableConsumer};
203 /// #[derive(Serialize, Deserialize, Debug)]
204 /// struct Data {
205 /// attra_one: String,
206 /// attra_two: i8,
207 /// }
208 ///
209 /// let consumer = PausableConsumer::from("group_id", "localhost:9092");
210 /// let handler_1 = Box::new(| data: Data, metadata: Metadata| async move {
211 /// println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
212 /// });
213 /// consumer.add("topic_1".to_string(), handler_1);
214 /// consumer.subscribe().await;
215 /// ```
216 fn add(&mut self, topic: String, handler: Box<dyn Fn(T, Metadata) -> F>) {
217 self.topics_map.insert(topic, handler);
218 }
219
220 /// FIXME: Not ready yet
221 /// Subcribes to given set of topics and calls the given function for each message
222 ///
223 /// # Example
224 /// ```
225 /// use simple_kafka::{PausableConsumer};
226 /// #[derive(Serialize, Deserialize, Debug)]
227 /// struct Data {
228 /// attra_one: String,
229 /// attra_two: i8,
230 /// }
231 ///
232 /// let consumer = PausableConsumer::from("group_id", "localhost:9092");
233 /// let handler_1 = Box::new(| data: Data, metadata: Metadata| async move {
234 /// println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
235 /// });
236 /// consumer.add("topic_1".to_string(), handler_1);
237 /// consumer.subscribe().await;
238 /// ```
239 async fn subscribe(&self) {
240 let topics = self
241 .topics_map
242 .keys()
243 .map(|key| key.as_str())
244 .collect::<Vec<&str>>();
245
246 self.consumer
247 .subscribe(&topics)
248 .expect("Can't subscribe to specified topic");
249 loop {
250 let is_runnig = self.is_runnig.lock().await;
251 debug!("Subscriber is running: {:?}", *is_runnig);
252 if !(*is_runnig) {
253 debug!("Subscriber is stopped");
254 tokio::time::sleep(Duration::from_secs(10)).await;
255 continue;
256 }
257
258 match self.consumer.recv().await {
259 Ok(message) => {
260 let owned_message = message.detach();
261 handle_message(owned_message, &self.topics_map).await;
262 }
263 Err(error) => handle_error(error).await,
264 };
265 }
266 }
267
268 ///
269 /// Subscribe to a given topic and calls the given function for each message
270 ///
271 /// # Arguments
272 /// * `topic` - The topic to subscribe to
273 /// * `handler` - The handler function that will be called for each message for the give `topic`
274 ///
275 /// # Example
276 /// ```
277 /// use simple_kafka::{Consumer};
278 /// #[derive(Serialize, Deserialize, Debug)]
279 /// struct Data {
280 /// attra_one: String,
281 /// attra_two: i8,
282 /// }
283 ///
284 /// let consumer = Consumer::from("group_id", "localhost:9092");
285 /// let handler_1 = | data: Data, metadata: Metadata| async move {
286 /// println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
287 /// };
288 /// consumer.subscribe_to_topic("topic_1".to_string(), handler_1).await.unwrap();
289 /// ```
290 pub async fn subscribe_to_topic<H>(&mut self, topic: &str, handler: H) -> Result<(), Error>
291 where
292 H: Fn(KafkaResult<T>) -> F,
293 {
294 let subscribe = self.consumer.subscribe(&[topic]);
295 if let Err(error) = subscribe {
296 return Err(Error::Kafka(error));
297 }
298
299 loop {
300 let is_runnig = self.is_runnig.lock().await;
301 debug!("Subscriber is running: {:?}", *is_runnig);
302 if !*is_runnig {
303 drop(is_runnig);
304 debug!("Subscriber is apused");
305 tokio::time::sleep(Duration::from_secs(10)).await;
306 continue;
307 }
308
309 match self.consumer.recv().await {
310 Ok(message) => {
311 let owned_message = message.detach();
312 single_handle_message(owned_message, &handler).await;
313 }
314 Err(error) => handle_error(error).await,
315 };
316 }
317 }
318
319 ///
320 /// Pause the consumer. Consumer will not request new messages but will keep the connection to the broker alive
321 /// This is useful when you want to pause the consumer for a while and resume it later without having to reconnect to the broker
322 ///
323 /// # Example
324 /// ```
325 /// use simple_kafka::{Consumer};
326 ///
327 /// #[tokio::main]
328 /// async fn main() {
329 /// let (mut consumer, is_runnig) = PausableConsumer::from("group_id", "localhost:9092").unwrap();
330 /// let handler = consumer.subscribe_to_topic("topic".to_string(), |data: Data, medatad: Metadata| async move {
331 /// info!("data: {:?}, metadata: {:?}", data, medatad);
332 /// });
333 /// consumer.pause().await;
334 /// }
335 /// ```
336 pub async fn pause(&self) {
337 let mut is_runnig = self.is_runnig.lock().await;
338 *is_runnig = false;
339 }
340
341 ///
342 /// Resume the consumer
343 ///
344 /// # Example
345 /// ```
346 /// use simple_kafka::{Consumer};
347 ///
348 /// #[tokio::main]
349 /// async fn main() {
350 /// let (mut consumer, is_runnig) = PausableConsumer::from("group_id", "localhost:9092").unwrap();
351 /// let handler = consumer.subscribe_to_topic("topic".to_string(), |data: Data, medatad: Metadata| async move {
352 /// info!("data: {:?}, metadata: {:?}", data, medatad);
353 /// });
354 /// consumer.pause().await;
355 /// consumer.resume().await;
356 /// }
357 /// ```
358 ///
359 pub async fn resume(&self) {
360 let mut is_runnig = self.is_runnig.lock().await;
361 *is_runnig = true;
362 }
363}
364
365impl<T, F> Consumer<T, F>
366where
367 T: for<'a> serde::Deserialize<'a>,
368 F: Future<Output = ()> + Send + Sync + 'static,
369{
370 ///
371 /// Creates a new Consumer from group id and bootstrap servers
372 /// # Arguments
373 /// * `group_id` - The group_id of the consumer
374 /// * `bootstrap_servers` - The comma separated bootstrap servers
375 ///
376 /// # Example
377 /// ```
378 /// use simple_kafka::{Consumer};
379 /// let consumer = Consumer::from("group_id", "localhost:9092").unwrap();
380 /// ```
381 pub fn from(group_id: &str, bootstrap_servers: &str) -> Result<Self, Error> {
382 let consumer = ClientConfig::new()
383 .set("group.id", group_id)
384 .set("bootstrap.servers", bootstrap_servers)
385 .set("enable.partition.eof", "false")
386 .set("session.timeout.ms", "6000")
387 .set("enable.auto.commit", "true")
388 .create::<StreamConsumer>();
389
390 if let Err(error) = consumer {
391 return Err(Error::Kafka(error));
392 }
393 let consumer = consumer.unwrap();
394 Ok(Consumer {
395 consumer,
396 topics_map: HashMap::new(),
397 })
398 }
399
400 ///
401 /// Creates a new Consumer from consumer options
402 /// # Arguments
403 /// * `options` - A ConsumerOptions struct that holds the consumer options
404 ///
405 /// # Example
406 /// ```
407 /// use simple_kafka::{Consumer, ConsumerOptiopns};
408 ///
409 /// let options = ConsumerOptiopns {
410 /// bootstrap_servers: "localhost:9092".to_string(),
411 /// group_id: "group_id".to_string(),
412 /// session_timeout_ms: "6000".to_string(),
413 /// enable_auto_commit: true,
414 /// enable_partition_eof: false,
415 /// };
416 /// let consumer = Consumer::with_options(options).unwrap();
417 /// ```
418 pub fn with_options(options: ConsumerOptiopns) -> Result<Self, Error> {
419 let enable_partition_eof = match options.enable_partition_eof {
420 true => "true",
421 false => "false",
422 };
423
424 let enable_auto_commit = match options.enable_auto_commit {
425 true => "true",
426 false => "false",
427 };
428 let consumer = ClientConfig::new()
429 .set("group.id", options.group_id.as_str())
430 .set("bootstrap.servers", options.bootstrap_servers.as_str())
431 .set("enable.partition.eof", enable_partition_eof)
432 .set("session.timeout.ms", options.session_timeout_ms.as_str())
433 .set("enable.auto.commit", enable_auto_commit)
434 .create::<StreamConsumer>();
435
436 if let Err(error) = consumer {
437 return Err(Error::Kafka(error));
438 }
439 let consumer = consumer.unwrap();
440 Ok(Consumer {
441 consumer,
442 topics_map: HashMap::new(),
443 })
444 }
445
446 /// FIXME: Not ready yet
447 /// Add message handles to the cosnumer
448 /// Currently only support one handler.
449 ///
450 /// # Arguments
451 /// * `topic` - The topic to subscribe to
452 /// * `handler` - The handler function that will be called for each message for the give `topic`
453 ///
454 /// # Example
455 /// ```
456 /// use simple_kafka::{Consumer};
457 /// #[derive(Serialize, Deserialize, Debug)]
458 /// struct Data {
459 /// attra_one: String,
460 /// attra_two: i8,
461 /// }
462 ///
463 /// let consumer = Consumer::from("group_id", "localhost:9092");
464 /// let handler_1 = Box::new(| data: Data, metadata: Metadata| async move {
465 /// println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
466 /// });
467 /// consumer.add("topic_1".to_string(), handler_1);
468 /// consumer.subscribe().await;
469 /// ```
470 fn add(&mut self, topic: String, handler: Box<dyn Fn(T, Metadata) -> F>) {
471 self.topics_map.insert(topic, handler);
472 }
473
474 /// FIXME: Not ready yet
475 /// Subcribes to given set of topics and calls the given function for each message
476 ///
477 /// # Example
478 /// ```
479 /// use simple_kafka::{Consumer};
480 /// #[derive(Serialize, Deserialize, Debug)]
481 /// struct Data {
482 /// attra_one: String,
483 /// attra_two: i8,
484 /// }
485 ///
486 /// let consumer = Consumer::from("group_id", "localhost:9092");
487 /// let handler_1 = Box::new(| data: Data, metadata: Metadata| async move {
488 /// println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
489 /// });
490 /// consumer.add("topic_1".to_string(), handler_1);
491 /// consumer.subscribe().await;
492 /// ```
493 async fn subscribe(&self) {
494 let topics = self
495 .topics_map
496 .keys()
497 .map(|key| key.as_str())
498 .collect::<Vec<&str>>();
499
500 self.consumer
501 .subscribe(&topics)
502 .expect("Can't subscribe to specified topic");
503 loop {
504 match self.consumer.recv().await {
505 Ok(message) => {
506 let owned_message = message.detach();
507 handle_message(owned_message, &self.topics_map).await;
508 }
509 Err(error) => handle_error(error).await,
510 };
511 }
512 }
513
514 ///
515 /// Subscribe to a given topic and calls the given function for each message
516 ///
517 /// # Arguments
518 /// * `topic` - The topic to subscribe to
519 /// * `handler` - The handler function that will be called for each message for the give `topic`
520 ///
521 /// # Example
522 /// ```
523 /// use simple_kafka::{Consumer};
524 /// #[derive(Serialize, Deserialize, Debug)]
525 /// struct Data {
526 /// attra_one: String,
527 /// attra_two: i8,
528 /// }
529 ///
530 /// let mut consumer = Consumer::from("group_id", "localhost:9092");
531 /// let handler = consumer.subscribe_to_topic("topic".to_string(), |data: Data, medatad: Metadata| async move {
532 /// info!("data: {:?}, metadata: {:?}", data, medatad);
533 /// });
534 /// handler.await;
535 /// ```
536 pub async fn subscribe_to_topic<H>(&mut self, topic: &str, handler: H)
537 where
538 H: Fn(KafkaResult<T>) -> F,
539 {
540 self.consumer
541 .subscribe(&[topic])
542 .expect("Can't subscribe to specified topic");
543
544 loop {
545 match self.consumer.recv().await {
546 Ok(message) => {
547 let owned_message = message.detach();
548 single_handle_message(owned_message, &handler).await;
549 }
550 Err(error) => {
551 handler(KafkaResult::Err(Error::Kafka(error))).await;
552 }
553 };
554 }
555 }
556}
557
558async fn handle_message<F, T>(
559 owned_message: OwnedMessage,
560 topics_map: &HashMap<String, impl Fn(T, Metadata) -> F>,
561) where
562 F: Future<Output = ()> + Send + Sync + 'static,
563 T: for<'a> serde::Deserialize<'a>,
564{
565 let payload = owned_message.payload().unwrap();
566 let topic = owned_message.topic().to_string();
567 let handler = topics_map.get(topic.as_str()).unwrap();
568 let partition = owned_message.partition();
569 let offset = owned_message.offset();
570 let headers = extract_headers(&owned_message);
571
572 let metadata = Metadata {
573 topic,
574 partition,
575 offset,
576 headers,
577 };
578
579 let message: T = serde_json::from_slice(payload).unwrap();
580 handler(message, metadata).await;
581}
582
583async fn single_handle_message<F, T>(
584 owned_message: OwnedMessage,
585 handler: &impl Fn(KafkaResult<T>) -> F,
586) where
587 F: Future<Output = ()> + Send + Sync + 'static,
588 T: for<'a> serde::Deserialize<'a>,
589{
590 let payload = owned_message.payload();
591 let Some(payload) = payload else {
592 handler(KafkaResult::Ok(None)).await;
593 return;
594 };
595
596 let topic = owned_message.topic().to_string();
597 let partition = owned_message.partition();
598 let offset = owned_message.offset();
599 let headers = extract_headers(&owned_message);
600
601 let metadata = Metadata {
602 topic,
603 partition,
604 offset,
605 headers,
606 };
607
608 let message = serde_json::from_slice::<T>(payload);
609 match message {
610 Ok(message) => {
611 handler(KafkaResult::Ok(Some((message, metadata)))).await;
612 }
613 Err(err) => {
614 handler(KafkaResult::Err(Error::Serde(err))).await;
615 }
616 }
617}
618
619async fn handle_error(error: KafkaError) {
620 error!("Error while receiving message: {:?}", error);
621 match error {
622 rdkafka::error::KafkaError::Global(code) => match code {
623 rdkafka::types::RDKafkaErrorCode::BrokerTransportFailure => {
624 error!("Broker transport failure");
625 tokio::time::sleep(Duration::from_secs(10)).await;
626 }
627 _ => {
628 error!("Error while receiving message: {:?}", error)
629 }
630 },
631 _ => {
632 error!("Error while receiving message: {:?}", error);
633 }
634 }
635}
636
637fn extract_headers(owned_message: &rdkafka::message::OwnedMessage) -> HashMap<String, String> {
638 let headers = match owned_message.headers() {
639 Some(headers) => {
640 let mut map = HashMap::new();
641 for header in headers.iter() {
642 if let Some(value) = header.value {
643 let key = String::from(header.key);
644 if let Ok(value) = String::from_utf8(value.to_vec()) {
645 map.insert(key, value);
646 }
647 }
648 }
649 map
650 }
651 None => HashMap::new(),
652 };
653 headers
654}
655
656#[cfg(test)]
657mod tests {
658 use log::info;
659 use serde::{Deserialize, Serialize};
660
661 use super::*;
662
663 #[tokio::test]
664 async fn create_consumer_test() {
665 let mut consumer = Consumer::from("group_id", "localhost:9092").unwrap();
666 let handler =
667 consumer.subscribe_to_topic("topic", |result: KafkaResult<Data>| async move {
668 if let KafkaResult::Ok(Some((data, metadata))) = result {
669 info!("data: {:?}, metadata: {:?}", data, metadata);
670 return;
671 }
672 });
673 handler.await;
674 }
675
676 #[tokio::test]
677 async fn create_pausable_consumer_test() {
678 let (mut consumer, _is_runnit) =
679 PausableConsumer::from("group_id", "localhost:9092").unwrap();
680 let handler_1 = Box::new(|result: KafkaResult<Data>| async move {
681 if let KafkaResult::Ok(Some((data, metadata))) = result {
682 println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
683 return;
684 }
685 });
686
687 consumer
688 .subscribe_to_topic("topic", handler_1)
689 .await
690 .unwrap();
691 }
692
693 #[derive(Serialize, Deserialize, Debug)]
694 struct Data {
695 attra_one: String,
696 attra_two: i8,
697 }
698}