high_level_kafka/
consumer.rs

1use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
2
3use futures::Future;
4use log::{debug, error};
5use rdkafka::{
6    consumer::{Consumer as KafkaConsumer, StreamConsumer},
7    error::KafkaError,
8    message::{Headers, OwnedMessage},
9    ClientConfig, Message as KafkaMessage,
10};
11use tokio::sync::Mutex;
12
13use crate::{KafkaError as Error, KafkaResult, Metadata};
14
15///
16/// A Consumer that can be used to consume messages from kafka and has the ability to pause and resume
17///
18pub struct PausableConsumer<T, F>
19where
20    T: for<'b> serde::Deserialize<'b>,
21    F: Future<Output = ()> + Send + Sync + 'static,
22{
23    consumer: rdkafka::consumer::StreamConsumer,
24    topics_map: HashMap<String, Box<dyn Fn(T, Metadata) -> F>>,
25    is_runnig: Arc<Mutex<bool>>,
26}
27
28///
29/// A Consumer that can be used to consume messages from kafka
30///
31pub struct Consumer<T, F>
32where
33    T: for<'b> serde::Deserialize<'b>,
34    F: Future<Output = ()> + Send + Sync + 'static,
35{
36    consumer: rdkafka::consumer::StreamConsumer,
37    topics_map: HashMap<String, Box<dyn Fn(T, Metadata) -> F>>,
38}
39
40///
41/// Configuration options for Consumers
42///
43pub struct ConsumerOptiopns<'a> {
44    bootstrap_servers: String,
45    group_id: String,
46    session_timeout_ms: String,
47    enable_auto_commit: bool,
48    enable_partition_eof: bool,
49    other_options: HashMap<&'a str, &'a str>,
50}
51
52impl<'a> ConsumerOptiopns<'a> {
53    ///
54    /// Creates a new ConsumerOptiopns
55    /// # Arguments
56    /// * `bootstrap_servers` - Comma separated bootstrap servers
57    /// * `group_id` - The group_id of the consumer
58    /// * `session_timeout_ms` - The session timeout in milliseconds
59    /// * `enable_auto_commit` - Enable auto commit
60    /// * `enable_partition_eof` - Enable partition eof
61    ///
62    /// # Example
63    /// ```
64    /// use simple_kafka::ConsumerOptiopns;
65    ///
66    /// let consumer_options = ConsumerOptiopns::from("localhost:9092".to_string(), "group_id".to_string(), "5000".to_string(), true, true, HashMap::new());
67    /// ```
68    pub fn from(
69        bootstrap_servers: String,
70        group_id: String,
71        session_timeout_ms: String,
72        enable_auto_commit: bool,
73        enable_partition_eof: bool,
74        other_options: HashMap<&'a str, &'a str>,
75    ) -> Self {
76        ConsumerOptiopns {
77            bootstrap_servers,
78            group_id,
79            session_timeout_ms,
80            enable_auto_commit,
81            enable_partition_eof,
82            other_options,
83        }
84    }
85}
86
87impl<T, F> PausableConsumer<T, F>
88where
89    T: for<'a> serde::Deserialize<'a>,
90    F: Future<Output = ()> + Send + Sync + 'static,
91{
92    ///
93    /// Creates a new PausableConsumer from a group_id and bootstrap_servers
94    /// # Arguments
95    /// * `group_id` - The group_id of the consumer
96    /// * `bootstrap_servers` - The comma separated bootstrap servers
97    ///
98    /// # Returns
99    /// A tuple of PausableConsumer and a Arc<Mutex<bool>>. The Arc<Mutex<bool>> is used to pause and resume the consumer
100    ///
101    /// # Example
102    /// ```
103    /// use simple_kafka::{PausableConsumer};
104    ///
105    /// let (consumer, paused) = PausableConsumer::from("group_id", "localhost:9092").unwrap();
106    /// ```
107    pub fn from(
108        group_id: &str,
109        bootstrap_servers: &str,
110    ) -> Result<(Self, Arc<Mutex<bool>>), Error> {
111        let consumer = ClientConfig::new()
112            .set("group.id", group_id)
113            .set("bootstrap.servers", bootstrap_servers)
114            .set("enable.partition.eof", "false")
115            .set("session.timeout.ms", "6000")
116            .set("enable.auto.commit", "true")
117            .create::<StreamConsumer>();
118
119        if let Err(error) = consumer {
120            return Err(Error::Kafka(error));
121        }
122
123        let is_runnig = Arc::new(Mutex::new(true));
124        let consumer = consumer.unwrap();
125        let pausable_consumer = PausableConsumer {
126            consumer,
127            topics_map: HashMap::new(),
128            is_runnig: is_runnig.clone(),
129        };
130
131        Ok((pausable_consumer, is_runnig))
132    }
133
134    ///
135    /// Creates a new PausedConsumer from consumer options
136    /// # Arguments
137    /// * `options` - A ConsumerOptions struct that holds the consumer options
138    ///
139    /// # Returns
140    /// A tuple of PausableConsumer and a Arc<Mutex<bool>>. The Arc<Mutex<bool>> is used to pause and resume the consumer
141    ///
142    /// # Example
143    /// ```
144    /// use simple_kafka::{PausableConsumer, ConsumerOptiopns};
145    ///
146    /// let options = ConsumerOptiopns {
147    ///     bootstrap_servers: "localhost:9092".to_string(),
148    ///     group_id: "group_id".to_string(),
149    ///     session_timeout_ms: "6000".to_string(),
150    ///     enable_auto_commit: true,
151    ///     enable_partition_eof: false,
152    /// };
153    /// let consumer = PausableConsumer::with_options(options)?;
154    /// ```
155    pub fn with_options(options: ConsumerOptiopns) -> Result<(Self, Arc<Mutex<bool>>), Error> {
156        let enable_partition_eof = match options.enable_partition_eof {
157            true => "true",
158            false => "false",
159        };
160
161        let enable_auto_commit = match options.enable_auto_commit {
162            true => "true",
163            false => "false",
164        };
165        let mut binding = ClientConfig::new();
166        let consumer = binding
167            .set("group.id", options.group_id.as_str())
168            .set("bootstrap.servers", options.bootstrap_servers.as_str())
169            .set("enable.partition.eof", enable_partition_eof)
170            .set("session.timeout.ms", options.session_timeout_ms.as_str())
171            .set("enable.auto.commit", enable_auto_commit);
172
173        options.other_options.iter().for_each(|(key, value)| {
174            consumer.set(*key, *value);
175        });
176
177        let consumer = consumer.create::<StreamConsumer>();
178        if let Err(error) = consumer {
179            return Err(Error::Kafka(error));
180        }
181
182        let is_runnig = Arc::new(Mutex::new(true));
183        let consumer = consumer.unwrap();
184        let pausable_consumer = PausableConsumer {
185            consumer,
186            topics_map: HashMap::new(),
187            is_runnig: is_runnig.clone(),
188        };
189        Ok((pausable_consumer, is_runnig))
190    }
191
192    /// FIXME: Not ready yet
193    /// Add message handles to the cosnumer
194    /// Currently only support one handler.
195    ///
196    /// # Arguments
197    /// * `topic` - The topic to subscribe to
198    /// * `handler` - The handler function that will be called for each message for the give `topic`
199    ///
200    /// # Example
201    /// ```
202    /// use simple_kafka::{PausableConsumer};
203    /// #[derive(Serialize, Deserialize, Debug)]
204    /// struct Data {
205    ///    attra_one: String,
206    ///   attra_two: i8,
207    /// }
208    ///
209    /// let consumer = PausableConsumer::from("group_id", "localhost:9092");
210    /// let handler_1 = Box::new(| data: Data, metadata: Metadata| async move {
211    ///    println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
212    /// });
213    /// consumer.add("topic_1".to_string(), handler_1);
214    /// consumer.subscribe().await;
215    /// ```
216    fn add(&mut self, topic: String, handler: Box<dyn Fn(T, Metadata) -> F>) {
217        self.topics_map.insert(topic, handler);
218    }
219
220    /// FIXME: Not ready yet
221    /// Subcribes to given set of topics and calls the given function for each message
222    ///
223    /// # Example
224    /// ```
225    /// use simple_kafka::{PausableConsumer};
226    /// #[derive(Serialize, Deserialize, Debug)]
227    ///  struct Data {
228    ///     attra_one: String,
229    ///     attra_two: i8,
230    /// }
231    ///
232    /// let consumer = PausableConsumer::from("group_id", "localhost:9092");
233    /// let handler_1 = Box::new(| data: Data, metadata: Metadata| async move {
234    ///     println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
235    /// });
236    /// consumer.add("topic_1".to_string(), handler_1);
237    /// consumer.subscribe().await;
238    /// ```
239    async fn subscribe(&self) {
240        let topics = self
241            .topics_map
242            .keys()
243            .map(|key| key.as_str())
244            .collect::<Vec<&str>>();
245
246        self.consumer
247            .subscribe(&topics)
248            .expect("Can't subscribe to specified topic");
249        loop {
250            let is_runnig = self.is_runnig.lock().await;
251            debug!("Subscriber is running: {:?}", *is_runnig);
252            if !(*is_runnig) {
253                debug!("Subscriber is stopped");
254                tokio::time::sleep(Duration::from_secs(10)).await;
255                continue;
256            }
257
258            match self.consumer.recv().await {
259                Ok(message) => {
260                    let owned_message = message.detach();
261                    handle_message(owned_message, &self.topics_map).await;
262                }
263                Err(error) => handle_error(error).await,
264            };
265        }
266    }
267
268    ///
269    /// Subscribe to a given topic and calls the given function for each message
270    ///
271    /// # Arguments
272    /// * `topic` - The topic to subscribe to
273    /// * `handler` - The handler function that will be called for each message for the give `topic`
274    ///
275    /// # Example
276    /// ```
277    /// use simple_kafka::{Consumer};
278    /// #[derive(Serialize, Deserialize, Debug)]
279    ///  struct Data {
280    ///     attra_one: String,
281    ///     attra_two: i8,
282    /// }
283    ///
284    /// let consumer = Consumer::from("group_id", "localhost:9092");
285    /// let handler_1 = | data: Data, metadata: Metadata| async move {
286    ///     println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
287    /// };
288    /// consumer.subscribe_to_topic("topic_1".to_string(), handler_1).await.unwrap();
289    /// ```
290    pub async fn subscribe_to_topic<H>(&mut self, topic: &str, handler: H) -> Result<(), Error>
291    where
292        H: Fn(KafkaResult<T>) -> F,
293    {
294        let subscribe = self.consumer.subscribe(&[topic]);
295        if let Err(error) = subscribe {
296            return Err(Error::Kafka(error));
297        }
298
299        loop {
300            let is_runnig = self.is_runnig.lock().await;
301            debug!("Subscriber is running: {:?}", *is_runnig);
302            if !*is_runnig {
303                drop(is_runnig);
304                debug!("Subscriber is apused");
305                tokio::time::sleep(Duration::from_secs(10)).await;
306                continue;
307            }
308
309            match self.consumer.recv().await {
310                Ok(message) => {
311                    let owned_message = message.detach();
312                    single_handle_message(owned_message, &handler).await;
313                }
314                Err(error) => handle_error(error).await,
315            };
316        }
317    }
318
319    ///
320    /// Pause the consumer. Consumer  will not request new messages but will keep the connection to the broker alive
321    /// This is useful when you want to pause the consumer for a while and resume it later without having to reconnect to the broker
322    ///
323    /// # Example
324    /// ```
325    /// use simple_kafka::{Consumer};
326    ///
327    /// #[tokio::main]
328    /// async fn main() {
329    ///     let (mut consumer, is_runnig)  = PausableConsumer::from("group_id", "localhost:9092").unwrap();
330    ///     let handler = consumer.subscribe_to_topic("topic".to_string(), |data: Data, medatad: Metadata| async move {
331    ///         info!("data: {:?}, metadata: {:?}", data, medatad);
332    ///     });
333    ///     consumer.pause().await;
334    /// }
335    /// ```
336    pub async fn pause(&self) {
337        let mut is_runnig = self.is_runnig.lock().await;
338        *is_runnig = false;
339    }
340
341    ///
342    /// Resume the consumer
343    ///
344    /// # Example
345    /// ```
346    /// use simple_kafka::{Consumer};
347    ///
348    /// #[tokio::main]
349    /// async fn main() {
350    ///     let (mut consumer, is_runnig)  = PausableConsumer::from("group_id", "localhost:9092").unwrap();
351    ///     let handler = consumer.subscribe_to_topic("topic".to_string(), |data: Data, medatad: Metadata| async move {
352    ///         info!("data: {:?}, metadata: {:?}", data, medatad);
353    ///     });
354    ///     consumer.pause().await;
355    ///     consumer.resume().await;
356    /// }
357    /// ```
358    ///
359    pub async fn resume(&self) {
360        let mut is_runnig = self.is_runnig.lock().await;
361        *is_runnig = true;
362    }
363}
364
365impl<T, F> Consumer<T, F>
366where
367    T: for<'a> serde::Deserialize<'a>,
368    F: Future<Output = ()> + Send + Sync + 'static,
369{
370    ///
371    /// Creates a new Consumer from group id and bootstrap servers
372    /// # Arguments
373    /// * `group_id` - The group_id of the consumer
374    /// * `bootstrap_servers` - The comma separated bootstrap servers
375    ///
376    /// # Example
377    /// ```
378    /// use simple_kafka::{Consumer};
379    /// let consumer = Consumer::from("group_id", "localhost:9092").unwrap();
380    /// ```
381    pub fn from(group_id: &str, bootstrap_servers: &str) -> Result<Self, Error> {
382        let consumer = ClientConfig::new()
383            .set("group.id", group_id)
384            .set("bootstrap.servers", bootstrap_servers)
385            .set("enable.partition.eof", "false")
386            .set("session.timeout.ms", "6000")
387            .set("enable.auto.commit", "true")
388            .create::<StreamConsumer>();
389
390        if let Err(error) = consumer {
391            return Err(Error::Kafka(error));
392        }
393        let consumer = consumer.unwrap();
394        Ok(Consumer {
395            consumer,
396            topics_map: HashMap::new(),
397        })
398    }
399
400    ///
401    /// Creates a new Consumer from consumer options
402    /// # Arguments
403    /// * `options` - A ConsumerOptions struct that holds the consumer options
404    ///
405    /// # Example
406    /// ```
407    /// use simple_kafka::{Consumer, ConsumerOptiopns};
408    ///
409    /// let options = ConsumerOptiopns {
410    ///     bootstrap_servers: "localhost:9092".to_string(),
411    ///     group_id: "group_id".to_string(),
412    ///     session_timeout_ms: "6000".to_string(),
413    ///     enable_auto_commit: true,
414    ///     enable_partition_eof: false,
415    /// };
416    /// let consumer = Consumer::with_options(options).unwrap();
417    /// ```
418    pub fn with_options(options: ConsumerOptiopns) -> Result<Self, Error> {
419        let enable_partition_eof = match options.enable_partition_eof {
420            true => "true",
421            false => "false",
422        };
423
424        let enable_auto_commit = match options.enable_auto_commit {
425            true => "true",
426            false => "false",
427        };
428        let consumer = ClientConfig::new()
429            .set("group.id", options.group_id.as_str())
430            .set("bootstrap.servers", options.bootstrap_servers.as_str())
431            .set("enable.partition.eof", enable_partition_eof)
432            .set("session.timeout.ms", options.session_timeout_ms.as_str())
433            .set("enable.auto.commit", enable_auto_commit)
434            .create::<StreamConsumer>();
435
436        if let Err(error) = consumer {
437            return Err(Error::Kafka(error));
438        }
439        let consumer = consumer.unwrap();
440        Ok(Consumer {
441            consumer,
442            topics_map: HashMap::new(),
443        })
444    }
445
446    /// FIXME: Not ready yet
447    /// Add message handles to the cosnumer
448    /// Currently only support one handler.
449    ///
450    /// # Arguments
451    /// * `topic` - The topic to subscribe to
452    /// * `handler` - The handler function that will be called for each message for the give `topic`
453    ///
454    /// # Example
455    /// ```
456    /// use simple_kafka::{Consumer};
457    /// #[derive(Serialize, Deserialize, Debug)]
458    /// struct Data {
459    ///    attra_one: String,
460    ///   attra_two: i8,
461    /// }
462    ///
463    /// let consumer = Consumer::from("group_id", "localhost:9092");
464    /// let handler_1 = Box::new(| data: Data, metadata: Metadata| async move {
465    ///    println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
466    /// });
467    /// consumer.add("topic_1".to_string(), handler_1);
468    /// consumer.subscribe().await;
469    /// ```
470    fn add(&mut self, topic: String, handler: Box<dyn Fn(T, Metadata) -> F>) {
471        self.topics_map.insert(topic, handler);
472    }
473
474    /// FIXME: Not ready yet
475    /// Subcribes to given set of topics and calls the given function for each message
476    ///
477    /// # Example
478    /// ```
479    /// use simple_kafka::{Consumer};
480    /// #[derive(Serialize, Deserialize, Debug)]
481    ///  struct Data {
482    ///     attra_one: String,
483    ///     attra_two: i8,
484    /// }
485    ///
486    /// let consumer = Consumer::from("group_id", "localhost:9092");
487    /// let handler_1 = Box::new(| data: Data, metadata: Metadata| async move {
488    ///     println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
489    /// });
490    /// consumer.add("topic_1".to_string(), handler_1);
491    /// consumer.subscribe().await;
492    /// ```
493    async fn subscribe(&self) {
494        let topics = self
495            .topics_map
496            .keys()
497            .map(|key| key.as_str())
498            .collect::<Vec<&str>>();
499
500        self.consumer
501            .subscribe(&topics)
502            .expect("Can't subscribe to specified topic");
503        loop {
504            match self.consumer.recv().await {
505                Ok(message) => {
506                    let owned_message = message.detach();
507                    handle_message(owned_message, &self.topics_map).await;
508                }
509                Err(error) => handle_error(error).await,
510            };
511        }
512    }
513
514    ///
515    /// Subscribe to a given topic and calls the given function for each message
516    ///
517    /// # Arguments
518    /// * `topic` - The topic to subscribe to
519    /// * `handler` - The handler function that will be called for each message for the give `topic`
520    ///
521    /// # Example
522    /// ```
523    /// use simple_kafka::{Consumer};
524    /// #[derive(Serialize, Deserialize, Debug)]
525    ///  struct Data {
526    ///     attra_one: String,
527    ///     attra_two: i8,
528    /// }
529    ///
530    /// let mut consumer = Consumer::from("group_id", "localhost:9092");
531    /// let handler = consumer.subscribe_to_topic("topic".to_string(), |data: Data, medatad: Metadata| async move {
532    ///    info!("data: {:?}, metadata: {:?}", data, medatad);
533    /// });
534    /// handler.await;
535    /// ```
536    pub async fn subscribe_to_topic<H>(&mut self, topic: &str, handler: H)
537    where
538        H: Fn(KafkaResult<T>) -> F,
539    {
540        self.consumer
541            .subscribe(&[topic])
542            .expect("Can't subscribe to specified topic");
543
544        loop {
545            match self.consumer.recv().await {
546                Ok(message) => {
547                    let owned_message = message.detach();
548                    single_handle_message(owned_message, &handler).await;
549                }
550                Err(error) => {
551                    handler(KafkaResult::Err(Error::Kafka(error))).await;
552                }
553            };
554        }
555    }
556}
557
558async fn handle_message<F, T>(
559    owned_message: OwnedMessage,
560    topics_map: &HashMap<String, impl Fn(T, Metadata) -> F>,
561) where
562    F: Future<Output = ()> + Send + Sync + 'static,
563    T: for<'a> serde::Deserialize<'a>,
564{
565    let payload = owned_message.payload().unwrap();
566    let topic = owned_message.topic().to_string();
567    let handler = topics_map.get(topic.as_str()).unwrap();
568    let partition = owned_message.partition();
569    let offset = owned_message.offset();
570    let headers = extract_headers(&owned_message);
571
572    let metadata = Metadata {
573        topic,
574        partition,
575        offset,
576        headers,
577    };
578
579    let message: T = serde_json::from_slice(payload).unwrap();
580    handler(message, metadata).await;
581}
582
583async fn single_handle_message<F, T>(
584    owned_message: OwnedMessage,
585    handler: &impl Fn(KafkaResult<T>) -> F,
586) where
587    F: Future<Output = ()> + Send + Sync + 'static,
588    T: for<'a> serde::Deserialize<'a>,
589{
590    let payload = owned_message.payload();
591    let Some(payload) = payload else {
592        handler(KafkaResult::Ok(None)).await;
593        return;
594    };
595
596    let topic = owned_message.topic().to_string();
597    let partition = owned_message.partition();
598    let offset = owned_message.offset();
599    let headers = extract_headers(&owned_message);
600
601    let metadata = Metadata {
602        topic,
603        partition,
604        offset,
605        headers,
606    };
607
608    let message = serde_json::from_slice::<T>(payload);
609    match message {
610        Ok(message) => {
611            handler(KafkaResult::Ok(Some((message, metadata)))).await;
612        }
613        Err(err) => {
614            handler(KafkaResult::Err(Error::Serde(err))).await;
615        }
616    }
617}
618
619async fn handle_error(error: KafkaError) {
620    error!("Error while receiving message: {:?}", error);
621    match error {
622        rdkafka::error::KafkaError::Global(code) => match code {
623            rdkafka::types::RDKafkaErrorCode::BrokerTransportFailure => {
624                error!("Broker transport failure");
625                tokio::time::sleep(Duration::from_secs(10)).await;
626            }
627            _ => {
628                error!("Error while receiving message: {:?}", error)
629            }
630        },
631        _ => {
632            error!("Error while receiving message: {:?}", error);
633        }
634    }
635}
636
637fn extract_headers(owned_message: &rdkafka::message::OwnedMessage) -> HashMap<String, String> {
638    let headers = match owned_message.headers() {
639        Some(headers) => {
640            let mut map = HashMap::new();
641            for header in headers.iter() {
642                if let Some(value) = header.value {
643                    let key = String::from(header.key);
644                    if let Ok(value) = String::from_utf8(value.to_vec()) {
645                        map.insert(key, value);
646                    }
647                }
648            }
649            map
650        }
651        None => HashMap::new(),
652    };
653    headers
654}
655
656#[cfg(test)]
657mod tests {
658    use log::info;
659    use serde::{Deserialize, Serialize};
660
661    use super::*;
662
663    #[tokio::test]
664    async fn create_consumer_test() {
665        let mut consumer = Consumer::from("group_id", "localhost:9092").unwrap();
666        let handler =
667            consumer.subscribe_to_topic("topic", |result: KafkaResult<Data>| async move {
668                if let KafkaResult::Ok(Some((data, metadata))) = result {
669                    info!("data: {:?}, metadata: {:?}", data, metadata);
670                    return;
671                }
672            });
673        handler.await;
674    }
675
676    #[tokio::test]
677    async fn create_pausable_consumer_test() {
678        let (mut consumer, _is_runnit) =
679            PausableConsumer::from("group_id", "localhost:9092").unwrap();
680        let handler_1 = Box::new(|result: KafkaResult<Data>| async move {
681            if let KafkaResult::Ok(Some((data, metadata))) = result {
682                println!("Handler One ::: data: {:?}, metadata: {:?}", data, metadata);
683                return;
684            }
685        });
686
687        consumer
688            .subscribe_to_topic("topic", handler_1)
689            .await
690            .unwrap();
691    }
692
693    #[derive(Serialize, Deserialize, Debug)]
694    struct Data {
695        attra_one: String,
696        attra_two: i8,
697    }
698}