novax_rdkafka/
lib.rs

1use rdkafka::consumer::StreamConsumer;
2use rdkafka::producer::FutureProducer;
3use std::future::Future;
4
5mod consumer;
6mod error;
7mod producer;
8
9use consumer::KafkaConsumer;
10use producer::KafkaProducer;
11
12pub use error::KafkaError as KafkaError;
13
14pub fn kafka_consumer(grp_brokers: (String, String), topics: Vec::<String>) -> Result<StreamConsumer, KafkaError > {
15    let (grp_id, brokers) = grp_brokers;
16    let cons = KafkaConsumer::stream_consumer(grp_id, brokers, topics);
17    cons
18}
19
20pub fn kafka_producer(brokers: String) -> Result<FutureProducer, KafkaError > {
21    let prods = KafkaProducer::future_producer(brokers);
22    prods
23}
24
25#[cfg(test)]
26mod tests {
27    use super::*;
28
29    #[tokio::test]
30    async fn should_consumer_subscribe_ok() -> Result<(), KafkaError > {
31        let grp_brokers = ("group_id".to_string(), "localhost:9193".to_string());
32        let topics = vec!["some_topic".to_string(), "QUIT".to_string()];
33        let con = kafka_consumer(grp_brokers, topics)?;
34        tokio::spawn(async {
35            let _ = KafkaConsumer::stream_subscriber(con).await?;
36            Ok::<(),KafkaError >(())
37        });
38        Ok(())
39    }
40
41    #[tokio::test]
42    async fn should_producer_publish_ok() -> Result<(), KafkaError > {
43        let brokers = "localhost:9193".to_string();
44        let topic = "some_topic".to_string();
45        let prod = kafka_producer(brokers)?;
46        
47        let _ = KafkaProducer::publish(
48            prod.clone(), topic, format!("Message {}", 1).as_bytes().to_vec()
49        ).await?;
50        
51        use tokio::time::{sleep, Duration};
52        sleep(Duration::from_secs(2)).await;
53        let _ = KafkaProducer::publish(
54            prod, "QUIT".to_string(), format!("Message {}", 1).as_bytes().to_vec()
55        ).await?;
56        Ok(())
57    }
58}