1use rdkafka::consumer::StreamConsumer;
2use rdkafka::producer::FutureProducer;
3use std::future::Future;
4
5mod consumer;
6mod error;
7mod producer;
8
9use consumer::KafkaConsumer;
10use producer::KafkaProducer;
11
12pub use error::KafkaError as KafkaError;
13
14pub fn kafka_consumer(grp_brokers: (String, String), topics: Vec::<String>) -> Result<StreamConsumer, KafkaError > {
15 let (grp_id, brokers) = grp_brokers;
16 let cons = KafkaConsumer::stream_consumer(grp_id, brokers, topics);
17 cons
18}
19
20pub fn kafka_producer(brokers: String) -> Result<FutureProducer, KafkaError > {
21 let prods = KafkaProducer::future_producer(brokers);
22 prods
23}
24
25#[cfg(test)]
26mod tests {
27 use super::*;
28
29 #[tokio::test]
30 async fn should_consumer_subscribe_ok() -> Result<(), KafkaError > {
31 let grp_brokers = ("group_id".to_string(), "localhost:9193".to_string());
32 let topics = vec!["some_topic".to_string(), "QUIT".to_string()];
33 let con = kafka_consumer(grp_brokers, topics)?;
34 tokio::spawn(async {
35 let _ = KafkaConsumer::stream_subscriber(con).await?;
36 Ok::<(),KafkaError >(())
37 });
38 Ok(())
39 }
40
41 #[tokio::test]
42 async fn should_producer_publish_ok() -> Result<(), KafkaError > {
43 let brokers = "localhost:9193".to_string();
44 let topic = "some_topic".to_string();
45 let prod = kafka_producer(brokers)?;
46
47 let _ = KafkaProducer::publish(
48 prod.clone(), topic, format!("Message {}", 1).as_bytes().to_vec()
49 ).await?;
50
51 use tokio::time::{sleep, Duration};
52 sleep(Duration::from_secs(2)).await;
53 let _ = KafkaProducer::publish(
54 prod, "QUIT".to_string(), format!("Message {}", 1).as_bytes().to_vec()
55 ).await?;
56 Ok(())
57 }
58}