high_level_kafka/
publisher.rs1use std::{collections::HashMap, fmt::Debug, time::Duration};
2
3use log::error;
4use rdkafka::{
5 message::{Header as KafkaHeader, OwnedHeaders},
6 producer::{FutureProducer, FutureRecord},
7 ClientConfig,
8};
9
10use crate::KafkaError;
11
12#[derive(Debug)]
13pub struct Message<T: serde::Serialize + Debug> {
14 topic: String,
15 headers: HashMap<String, String>,
16 data: T,
17 key: String,
18}
19
20impl<T: serde::Serialize + Debug> Message<T> {
21 pub fn new(topic: String, headers: HashMap<String, String>, data: T, key: String) -> Self {
30 Message {
31 topic,
32 headers,
33 data,
34 key,
35 }
36 }
37}
38
39pub struct ProducerOptiopns<'a> {
43 bootstrap_servers: String,
44 message_timeout_ms: String,
45 queue_timeout_secs: u64,
46 other_options: HashMap<&'a str, &'a str>,
47}
48
49impl<'a> ProducerOptiopns<'a> {
50 pub fn from(
65 bootstrap_servers: String,
66 message_timeout_ms: String,
67 queue_timeout_secs: u64,
68 other_options: HashMap<&'a str, &'a str>,
69 ) -> Self {
70 ProducerOptiopns {
71 bootstrap_servers,
72 message_timeout_ms,
73 queue_timeout_secs,
74 other_options,
75 }
76 }
77}
78
79pub struct KafkaProducer {
84 producer: FutureProducer,
85 duration_secs: Duration,
86}
87
88impl KafkaProducer {
89 pub fn from(bootstrap_servers: &str, queue_timeout_secs: u64) -> Result<Self, KafkaError> {
107 let producer = ClientConfig::new()
108 .set("bootstrap.servers", bootstrap_servers)
109 .set("message.timeout.ms", "5000")
110 .create::<FutureProducer>();
111
112 if let Err(error) = producer {
113 return Err(KafkaError::Kafka(error));
114 }
115
116 let producer = producer.unwrap();
117
118 Ok(KafkaProducer {
119 producer,
120 duration_secs: Duration::from_secs(queue_timeout_secs),
121 })
122 }
123
124 pub fn with_options(producer_options: ProducerOptiopns) -> Result<Self, KafkaError> {
125 let mut config = ClientConfig::new();
126 config.set(
127 "bootstrap.servers",
128 producer_options.bootstrap_servers.as_str(),
129 );
130 config.set(
131 "message.timeout.ms",
132 producer_options.message_timeout_ms.as_str(),
133 );
134
135 producer_options
136 .other_options
137 .iter()
138 .for_each(|(key, value)| {
139 config.set(*key, *value);
140 });
141
142 let producer = config.create::<FutureProducer>();
143
144 if let Err(error) = producer {
145 return Err(KafkaError::Kafka(error));
146 }
147
148 let producer = producer.unwrap();
149
150 Ok(KafkaProducer {
151 producer,
152 duration_secs: Duration::from_secs(producer_options.queue_timeout_secs),
153 })
154 }
155
156 pub async fn produce<T: serde::Serialize + Debug>(
181 &self,
182 message: Message<T>,
183 ) -> Result<(), KafkaError> {
184 let mut builder = FutureRecord::to(&message.topic).key(message.key.as_str());
185 let mut kafka_headers = OwnedHeaders::new();
186 for (header, value) in message.headers.iter() {
187 kafka_headers = kafka_headers.insert(KafkaHeader {
188 key: header.as_str(),
189 value: Some(value.as_str()),
190 });
191 }
192
193 builder = builder.headers(kafka_headers);
194
195 let restult = serde_json::to_vec(&message.data);
196 if let Err(error) = restult {
197 return Err(KafkaError::Serde(error));
198 }
199
200 let serialized = restult.unwrap();
201
202 let publish_result = self
203 .producer
204 .send(builder.payload(&serialized), self.duration_secs)
205 .await;
206
207 if let Err((error, _)) = publish_result {
208 error!("Unable to send message {:?}, error {}", message, error);
209 return Err(KafkaError::Kafka(error));
210 }
211
212 Ok(())
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use serde::{Deserialize, Serialize};
219
220 use super::*;
221
222 #[tokio::test]
223 async fn publish_message_test() {
224 let publisher = KafkaProducer::from("localhost:9092", 10).unwrap();
225 let data = Data {
226 attra_one: "123".to_string(),
227 attra_two: 12,
228 };
229
230 let mut headers = HashMap::new();
231 headers.insert("header_one".to_string(), "value_one".to_string());
232 headers.insert("header_two".to_string(), "value_two".to_string());
233
234 let data = Message::new("topic".to_string(), headers, data, "key".to_string());
235 let result = publisher.produce(data).await;
236 assert!(result.is_ok());
237 }
238
239 #[derive(Serialize, Deserialize, Debug)]
240 struct Data {
241 attra_one: String,
242 attra_two: i8,
243 }
244}