high_level_kafka/
publisher.rs

1use std::{collections::HashMap, fmt::Debug, time::Duration};
2
3use log::error;
4use rdkafka::{
5    message::{Header as KafkaHeader, OwnedHeaders},
6    producer::{FutureProducer, FutureRecord},
7    ClientConfig,
8};
9
10use crate::KafkaError;
11
12#[derive(Debug)]
13pub struct Message<T: serde::Serialize + Debug> {
14    topic: String,
15    headers: HashMap<String, String>,
16    data: T,
17    key: String,
18}
19
20impl<T: serde::Serialize + Debug> Message<T> {
21    ///
22    /// Creates a new Message struct
23    /// # Arguments
24    /// * `topic` - A topic that the message should be published to
25    /// * `headers` - A HashMap that holds the headers that should be published with the message
26    /// * `data` - A generic type that holds the data that should be published, data should be serializable
27    /// * `key` - A key that should be used to publish the message
28    ///
29    pub fn new(topic: String, headers: HashMap<String, String>, data: T, key: String) -> Self {
30        Message {
31            topic,
32            headers,
33            data,
34            key,
35        }
36    }
37}
38
39///
40/// Configuration options for the producer
41///
42pub struct ProducerOptiopns<'a> {
43    bootstrap_servers: String,
44    message_timeout_ms: String,
45    queue_timeout_secs: u64,
46    other_options: HashMap<&'a str, &'a str>,
47}
48
49impl<'a> ProducerOptiopns<'a> {
50    ///
51    /// Creates a new ConsumerOptiopns
52    /// # Arguments
53    /// * `bootstrap_servers` - Comma separated bootstrap servers
54    /// * `message_timeout_ms` - Message timeout in milliseconds
55    /// * `queue_timeout_secs` - Queue timeout in seconds
56    /// * `other_options` - A HashMap that holds other options that should be used to create the consumer
57    ///
58    /// # Example
59    /// ```
60    /// use simple_kafka::ProducerOptiopns;
61    ///
62    /// let consumer_options = ProducerOptiopns::from("localhost:9092".to_string(), "5000".to_string(),5 HashMap::new());
63    /// ```
64    pub fn from(
65        bootstrap_servers: String,
66        message_timeout_ms: String,
67        queue_timeout_secs: u64,
68        other_options: HashMap<&'a str, &'a str>,
69    ) -> Self {
70        ProducerOptiopns {
71            bootstrap_servers,
72            message_timeout_ms,
73            queue_timeout_secs,
74            other_options,
75        }
76    }
77}
78
79///
80/// A Producer that can be use to publish messages to kafka
81///
82///
83pub struct KafkaProducer {
84    producer: FutureProducer,
85    duration_secs: Duration,
86}
87
88impl KafkaProducer {
89    ///
90    /// Creates a KakfkaProducer from a bootstrap_servers string
91    ///
92    /// # Arguments
93    /// * `bootstrap_servers` - Comma separated bootstrap servers
94    /// * `queue_timeout_secs` - Queue timeout in seconds
95    ///
96    /// # Returns
97    /// * `KafkaProducer` - A KafkaProducer that can be used to publish messages to kafka
98    ///
99    /// # Example
100    ///
101    /// ```
102    /// use simple_kafka::KafkaProducer;
103    ///
104    /// let producer = KafkaProducer::from("localhost:9092").unwrap();
105    /// ```
106    pub fn from(bootstrap_servers: &str, queue_timeout_secs: u64) -> Result<Self, KafkaError> {
107        let producer = ClientConfig::new()
108            .set("bootstrap.servers", bootstrap_servers)
109            .set("message.timeout.ms", "5000")
110            .create::<FutureProducer>();
111
112        if let Err(error) = producer {
113            return Err(KafkaError::Kafka(error));
114        }
115
116        let producer = producer.unwrap();
117
118        Ok(KafkaProducer {
119            producer,
120            duration_secs: Duration::from_secs(queue_timeout_secs),
121        })
122    }
123
124    pub fn with_options(producer_options: ProducerOptiopns) -> Result<Self, KafkaError> {
125        let mut config = ClientConfig::new();
126        config.set(
127            "bootstrap.servers",
128            producer_options.bootstrap_servers.as_str(),
129        );
130        config.set(
131            "message.timeout.ms",
132            producer_options.message_timeout_ms.as_str(),
133        );
134
135        producer_options
136            .other_options
137            .iter()
138            .for_each(|(key, value)| {
139                config.set(*key, *value);
140            });
141
142        let producer = config.create::<FutureProducer>();
143
144        if let Err(error) = producer {
145            return Err(KafkaError::Kafka(error));
146        }
147
148        let producer = producer.unwrap();
149
150        Ok(KafkaProducer {
151            producer,
152            duration_secs: Duration::from_secs(producer_options.queue_timeout_secs),
153        })
154    }
155
156    ///
157    /// Publishes a message to a topic
158    ///
159    /// # Arguments
160    /// * `message` - A Message struct that holds the topic, headers, data and key
161    ///
162    /// # Example
163    ///
164    /// ```
165    /// use simple_kafka::{KafkaProducer, Message};
166    /// #[derive(Serialize, Deserialize, Debug)]
167    ///  struct Data {
168    ///     attra_one: String,
169    ///     attra_two: i8,
170    /// }
171    ///
172    /// let producer = KafkaProducer::from("localhost:9092").unwrap();
173    /// let data  = Data {
174    ///     attra_one: "123".to_string(),
175    ///     attra_two: 12,
176    /// };  
177    /// let data = Message::new("topic".to_string(), HashMap::new(), data, "key".to_string());
178    /// let result = producer.produce(data).await;
179    /// ```
180    pub async fn produce<T: serde::Serialize + Debug>(
181        &self,
182        message: Message<T>,
183    ) -> Result<(), KafkaError> {
184        let mut builder = FutureRecord::to(&message.topic).key(message.key.as_str());
185        let mut kafka_headers = OwnedHeaders::new();
186        for (header, value) in message.headers.iter() {
187            kafka_headers = kafka_headers.insert(KafkaHeader {
188                key: header.as_str(),
189                value: Some(value.as_str()),
190            });
191        }
192
193        builder = builder.headers(kafka_headers);
194
195        let restult = serde_json::to_vec(&message.data);
196        if let Err(error) = restult {
197            return Err(KafkaError::Serde(error));
198        }
199
200        let serialized = restult.unwrap();
201
202        let publish_result = self
203            .producer
204            .send(builder.payload(&serialized), self.duration_secs)
205            .await;
206
207        if let Err((error, _)) = publish_result {
208            error!("Unable to send message {:?}, error {}", message, error);
209            return Err(KafkaError::Kafka(error));
210        }
211
212        Ok(())
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use serde::{Deserialize, Serialize};
219
220    use super::*;
221
222    #[tokio::test]
223    async fn publish_message_test() {
224        let publisher = KafkaProducer::from("localhost:9092", 10).unwrap();
225        let data = Data {
226            attra_one: "123".to_string(),
227            attra_two: 12,
228        };
229
230        let mut headers = HashMap::new();
231        headers.insert("header_one".to_string(), "value_one".to_string());
232        headers.insert("header_two".to_string(), "value_two".to_string());
233
234        let data = Message::new("topic".to_string(), headers, data, "key".to_string());
235        let result = publisher.produce(data).await;
236        assert!(result.is_ok());
237    }
238
239    #[derive(Serialize, Deserialize, Debug)]
240    struct Data {
241        attra_one: String,
242        attra_two: i8,
243    }
244}