1use kafka::consumer::Consumer;
2use kafka::producer::Producer;
3
4mod error;
5mod consumer;
6mod producer;
7
8pub use error::KafkaError as KafkaError;
9
10use consumer::KafkaConsumer;
11use producer::KafkaProducer;
12
13pub fn kafka_consumer(grp_brokers: (String, String), topics: Vec<String>) -> Result<Consumer, KafkaError > {
14 let (grp_id, brokers) = grp_brokers;
15 let cons = KafkaConsumer::stream_consumer(grp_id, vec![brokers], topics);
16 cons
17}
18
19pub fn kafka_producer(brokers: Vec<String>) -> Result<Producer, KafkaError > {
20 let prods = KafkaProducer::producer(brokers);
21 prods
22}
23
24#[cfg(test)]
25mod tests {
26 use super::*;
27 #[tokio::test]
28 async fn should_consumer_subscribe_ok() -> Result<(), KafkaError > {
29 let grp_brokers = ("group_id".to_string(), "localhost:9193".to_string());
30 let topics = vec!["some_topic".to_string(), "QUIT".to_string()];
31 let con = kafka_consumer(grp_brokers, topics)?;
32
33 tokio::spawn(async {
34 let _ = KafkaConsumer::stream_subscriber(con).await?;
35 Ok::<(),KafkaError >(())
36 });
37 Ok(())
38 }
39
40 #[tokio::test]
41 async fn should_producer_publish_ok() -> Result<(), KafkaError > {
42 let brokers = vec!["localhost:9193".to_string()];
43 let prod = kafka_producer(brokers)?;
44
45 let batch = vec![
46 kafka::producer::Record{key: b"key0".to_vec(), value: b"value0".to_vec(), topic: "some_topic", partition:0 },
47 kafka::producer::Record{key: b"key0".to_vec(), value: b"value0".to_vec(), topic: "some_topic", partition:0 },
48 kafka::producer::Record{key: b"key1".to_vec(), value: b"value1".to_vec(), topic: "QUIT", partition:0 }
49 ];
50
51 let _ = KafkaProducer::publish(prod, batch).await?;
52 Ok(())
53 }
54}