Skip to main content

novax_kafka/
lib.rs

1use kafka::consumer::Consumer;
2use kafka::producer::Producer;
3
4mod error;
5mod consumer;
6mod producer;
7
8pub use error::KafkaError as KafkaError;
9
10use consumer::KafkaConsumer;
11use producer::KafkaProducer;
12
13pub fn kafka_consumer(grp_brokers: (String, String), topics: Vec<String>) -> Result<Consumer, KafkaError > {
14    let (grp_id, brokers) = grp_brokers;
15    let cons = KafkaConsumer::stream_consumer(grp_id, vec![brokers], topics);
16    cons
17}
18
19pub fn kafka_producer(brokers: Vec<String>) -> Result<Producer, KafkaError >  {
20    let prods = KafkaProducer::producer(brokers);
21    prods
22}
23
24#[cfg(test)]
25mod tests {
26    use super::*;
27    #[tokio::test]
28    async fn should_consumer_subscribe_ok() -> Result<(), KafkaError > {
29        let grp_brokers = ("group_id".to_string(), "localhost:9193".to_string());
30        let topics = vec!["some_topic".to_string(), "QUIT".to_string()];
31        let con = kafka_consumer(grp_brokers, topics)?;
32
33        tokio::spawn(async {
34            let _ = KafkaConsumer::stream_subscriber(con).await?;
35            Ok::<(),KafkaError >(())
36        });
37        Ok(())
38    }
39
40    #[tokio::test]
41    async fn should_producer_publish_ok() -> Result<(), KafkaError > {
42        let brokers = vec!["localhost:9193".to_string()];
43        let prod = kafka_producer(brokers)?;
44        
45        let batch = vec![
46            kafka::producer::Record{key: b"key0".to_vec(), value: b"value0".to_vec(), topic: "some_topic", partition:0 },
47            kafka::producer::Record{key: b"key0".to_vec(), value: b"value0".to_vec(), topic: "some_topic", partition:0 },
48            kafka::producer::Record{key: b"key1".to_vec(), value: b"value1".to_vec(), topic: "QUIT", partition:0 }
49        ];
50
51        let _ = KafkaProducer::publish(prod, batch).await?;
52        Ok(())
53    }
54}