cloudevents_sdk_rdkafka/
kafka_consumer_record.rs1use crate::headers;
2use cloudevents::event::SpecVersion;
3use cloudevents::message::{
4 BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
5 Result, StructuredDeserializer, StructuredSerializer,
6};
7use cloudevents::{message, Event};
8use rdkafka::message::{BorrowedMessage, Headers, Message, OwnedMessage};
9use std::collections::HashMap;
10use std::convert::TryFrom;
11use std::str;
12
13pub struct ConsumerRecordDeserializer {
15 pub(crate) headers: HashMap<String, Vec<u8>>,
16 pub(crate) payload: Option<Vec<u8>>,
17}
18
19impl ConsumerRecordDeserializer {
20 fn get_kafka_headers(message: &impl Message) -> Result<HashMap<String, Vec<u8>>> {
21 let mut hm = HashMap::new();
22 let headers = message
23 .headers()
24 .ok_or(cloudevents::message::Error::WrongEncoding {})?;
26 for i in 0..headers.count() {
27 let header = headers.get(i).unwrap();
28 hm.insert(header.0.to_string(), Vec::from(header.1));
29 }
30 Ok(hm)
31 }
32
33 pub fn new(message: &impl Message) -> Result<ConsumerRecordDeserializer> {
34 Ok(ConsumerRecordDeserializer {
35 headers: Self::get_kafka_headers(message)?,
36 payload: message.payload().map(Vec::from),
37 })
38 }
39}
40
41impl BinaryDeserializer for ConsumerRecordDeserializer {
42 fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
43 if self.encoding() != Encoding::BINARY {
44 return Err(message::Error::WrongEncoding {});
45 }
46
47 let spec_version = SpecVersion::try_from(
48 str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
49 .map_err(|e| cloudevents::message::Error::Other {
50 source: Box::new(e),
51 })?,
52 )?;
53
54 visitor = visitor.set_spec_version(spec_version.clone())?;
55
56 let attributes = spec_version.attribute_names();
57
58 if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
59 visitor = visitor.set_attribute(
60 "datacontenttype",
61 MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
62 cloudevents::message::Error::Other {
63 source: Box::new(e),
64 }
65 })?),
66 )?
67 }
68
69 for (hn, hv) in self
70 .headers
71 .into_iter()
72 .filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
73 {
74 let name = &hn["ce_".len()..];
75
76 if attributes.contains(&name) {
77 visitor = visitor.set_attribute(
78 name,
79 MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
80 cloudevents::message::Error::Other {
81 source: Box::new(e),
82 }
83 })?),
84 )?
85 } else {
86 visitor = visitor.set_extension(
87 name,
88 MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
89 cloudevents::message::Error::Other {
90 source: Box::new(e),
91 }
92 })?),
93 )?
94 }
95 }
96
97 if self.payload != None {
98 visitor.end_with_data(self.payload.unwrap())
99 } else {
100 visitor.end()
101 }
102 }
103}
104
105impl StructuredDeserializer for ConsumerRecordDeserializer {
106 fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
107 if self.encoding() != Encoding::STRUCTURED {
108 return Err(message::Error::WrongEncoding {});
109 }
110 visitor.set_structured_event(self.payload.unwrap())
111 }
112}
113
114impl MessageDeserializer for ConsumerRecordDeserializer {
115 fn encoding(&self) -> Encoding {
116 match (
117 self.headers
118 .get("content-type")
119 .map(|s| String::from_utf8(s.to_vec()).ok())
120 .flatten()
121 .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
122 .unwrap_or(false),
123 self.headers.get(headers::SPEC_VERSION_HEADER),
124 ) {
125 (true, _) => Encoding::STRUCTURED,
126 (_, Some(_)) => Encoding::BINARY,
127 _ => Encoding::UNKNOWN,
128 }
129 }
130}
131
132pub fn record_to_event(msg: &impl Message) -> Result<Event> {
134 MessageDeserializer::into_event(ConsumerRecordDeserializer::new(msg)?)
135}
136
137pub trait MessageExt: private::Sealed {
141 fn to_event(&self) -> Result<Event>;
143}
144
145impl MessageExt for BorrowedMessage<'_> {
146 fn to_event(&self) -> Result<Event> {
147 record_to_event(self)
148 }
149}
150
151impl MessageExt for OwnedMessage {
152 fn to_event(&self) -> Result<Event> {
153 record_to_event(self)
154 }
155}
156
157mod private {
158 pub trait Sealed {}
160 impl Sealed for rdkafka::message::OwnedMessage {}
161 impl Sealed for rdkafka::message::BorrowedMessage<'_> {}
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use crate::kafka_producer_record::MessageRecord;
168
169 use chrono::Utc;
170 use cloudevents::{EventBuilder, EventBuilderV10};
171 use serde_json::json;
172
173 #[test]
174 fn test_binary_record() {
175 let time = Utc::now();
176
177 let expected = EventBuilderV10::new()
178 .id("0001")
179 .ty("example.test")
180 .time(time)
181 .source("http://localhost")
182 .extension("someint", "10")
183 .build()
184 .unwrap();
185
186 let message_record = MessageRecord::from_event(
192 EventBuilderV10::new()
193 .id("0001")
194 .ty("example.test")
195 .time(time)
196 .source("http://localhost")
197 .extension("someint", "10")
198 .build()
199 .unwrap(),
200 )
201 .unwrap();
202
203 let owned_message = OwnedMessage::new(
204 message_record.payload,
205 Some(String::from("test key").into_bytes()),
206 String::from("test topic"),
207 rdkafka::message::Timestamp::NotAvailable,
208 10,
209 10,
210 Some(message_record.headers),
211 );
212
213 assert_eq!(owned_message.to_event().unwrap(), expected)
214 }
215
216 #[test]
217 fn test_structured_record() {
218 let j = json!({"hello": "world"});
219
220 let expected = EventBuilderV10::new()
221 .id("0001")
222 .ty("example.test")
223 .source("http://localhost")
224 .data("application/json", j.clone())
225 .extension("someint", "10")
226 .build()
227 .unwrap();
228
229 let input = EventBuilderV10::new()
235 .id("0001")
236 .ty("example.test")
237 .source("http://localhost")
238 .data("application/json", j.clone())
239 .extension("someint", "10")
240 .build()
241 .unwrap();
242
243 let serialized_event =
244 StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap();
245
246 let owned_message = OwnedMessage::new(
247 serialized_event.payload,
248 Some(String::from("test key").into_bytes()),
249 String::from("test topic"),
250 rdkafka::message::Timestamp::NotAvailable,
251 10,
252 10,
253 Some(serialized_event.headers),
254 );
255
256 assert_eq!(owned_message.to_event().unwrap(), expected)
257 }
258}