cloudevents/binding/rdkafka/
kafka_consumer_record.rs1use rdkafka_lib as rdkafka;
2
3use crate::binding::{kafka::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE};
4use crate::event::SpecVersion;
5use crate::message::{
6 BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
7 Result, StructuredDeserializer, StructuredSerializer,
8};
9use crate::{message, Event};
10use rdkafka::message::{BorrowedMessage, Headers, Message, OwnedMessage};
11use std::collections::HashMap;
12use std::convert::TryFrom;
13use std::str;
14
15pub struct ConsumerRecordDeserializer {
17 pub(crate) headers: HashMap<String, Vec<u8>>,
18 pub(crate) payload: Option<Vec<u8>>,
19}
20
21impl ConsumerRecordDeserializer {
22 fn get_kafka_headers(message: &impl Message) -> Result<HashMap<String, Vec<u8>>> {
23 match message.headers() {
24 None => Err(crate::message::Error::WrongEncoding {}),
25 Some(headers) => Ok(headers
26 .iter()
27 .map(|h| (h.key.to_string(), Vec::from(h.value.unwrap())))
28 .collect()),
29 }
30 }
31
32 pub fn new(message: &impl Message) -> Result<ConsumerRecordDeserializer> {
33 Ok(ConsumerRecordDeserializer {
34 headers: Self::get_kafka_headers(message)?,
35 payload: message.payload().map(Vec::from),
36 })
37 }
38}
39
40impl BinaryDeserializer for ConsumerRecordDeserializer {
41 fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
42 if self.encoding() != Encoding::BINARY {
43 return Err(message::Error::WrongEncoding {});
44 }
45
46 let spec_version = SpecVersion::try_from(
47 str::from_utf8(&self.headers.remove(SPEC_VERSION_HEADER).unwrap()).map_err(|e| {
48 crate::message::Error::Other {
49 source: Box::new(e),
50 }
51 })?,
52 )?;
53
54 let attributes = spec_version.attribute_names();
55
56 visitor = visitor.set_spec_version(spec_version)?;
57
58 if let Some(hv) = self.headers.remove(CONTENT_TYPE) {
59 visitor = visitor.set_attribute(
60 "datacontenttype",
61 MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
62 crate::message::Error::Other {
63 source: Box::new(e),
64 }
65 })?),
66 )?
67 }
68
69 for (hn, hv) in self
70 .headers
71 .into_iter()
72 .filter(|(hn, _)| SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
73 {
74 let name = &hn["ce_".len()..];
75
76 if attributes.contains(&name) {
77 visitor = visitor.set_attribute(
78 name,
79 MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
80 crate::message::Error::Other {
81 source: Box::new(e),
82 }
83 })?),
84 )?
85 } else {
86 visitor = visitor.set_extension(
87 name,
88 MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
89 crate::message::Error::Other {
90 source: Box::new(e),
91 }
92 })?),
93 )?
94 }
95 }
96
97 if self.payload.is_some() {
98 visitor.end_with_data(self.payload.unwrap())
99 } else {
100 visitor.end()
101 }
102 }
103}
104
105impl StructuredDeserializer for ConsumerRecordDeserializer {
106 fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
107 if self.encoding() != Encoding::STRUCTURED {
108 return Err(message::Error::WrongEncoding {});
109 }
110 visitor.set_structured_event(self.payload.unwrap())
111 }
112}
113
114impl MessageDeserializer for ConsumerRecordDeserializer {
115 fn encoding(&self) -> Encoding {
116 match (
117 self.headers
118 .get("content-type")
119 .and_then(|s| String::from_utf8(s.to_vec()).ok())
120 .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
121 .unwrap_or(false),
122 self.headers.get(SPEC_VERSION_HEADER),
123 ) {
124 (true, _) => Encoding::STRUCTURED,
125 (_, Some(_)) => Encoding::BINARY,
126 _ => Encoding::UNKNOWN,
127 }
128 }
129}
130
131pub fn record_to_event(msg: &impl Message) -> Result<Event> {
133 MessageDeserializer::into_event(ConsumerRecordDeserializer::new(msg)?)
134}
135
136pub trait MessageExt: private::Sealed {
140 fn to_event(&self) -> Result<Event>;
142}
143
144impl MessageExt for BorrowedMessage<'_> {
145 fn to_event(&self) -> Result<Event> {
146 record_to_event(self)
147 }
148}
149
150impl MessageExt for OwnedMessage {
151 fn to_event(&self) -> Result<Event> {
152 record_to_event(self)
153 }
154}
155
156mod private {
157 use rdkafka_lib as rdkafka;
158
159 pub trait Sealed {}
161 impl Sealed for rdkafka::message::OwnedMessage {}
162 impl Sealed for rdkafka::message::BorrowedMessage<'_> {}
163}
164
165#[cfg(test)]
166mod tests {
167 use rdkafka_lib as rdkafka;
168
169 use super::*;
170 use crate::binding::rdkafka::kafka_producer_record::MessageRecord;
171
172 use crate::test::fixtures;
173 use crate::{EventBuilder, EventBuilderV10};
174
175 #[test]
176 fn test_binary_record() {
177 let expected = fixtures::v10::minimal_string_extension();
178
179 let message_record = MessageRecord::from_event(
185 EventBuilderV10::new()
186 .id("0001")
187 .ty("test_event.test_application")
188 .source("http://localhost/")
189 .extension("someint", "10")
190 .build()
191 .unwrap(),
192 )
193 .unwrap();
194
195 let owned_message = OwnedMessage::new(
196 message_record.payload,
197 Some(String::from("test key").into_bytes()),
198 String::from("test topic"),
199 rdkafka::message::Timestamp::NotAvailable,
200 10,
201 10,
202 Some(message_record.headers),
203 );
204
205 assert_eq!(owned_message.to_event().unwrap(), expected)
206 }
207
208 #[test]
209 fn test_structured_record() {
210 let expected = fixtures::v10::full_json_data_string_extension();
211
212 let input = expected.clone();
218
219 let serialized_event =
220 StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap();
221
222 let owned_message = OwnedMessage::new(
223 serialized_event.payload,
224 Some(String::from("test key").into_bytes()),
225 String::from("test topic"),
226 rdkafka::message::Timestamp::NotAvailable,
227 10,
228 10,
229 Some(serialized_event.headers),
230 );
231
232 assert_eq!(owned_message.to_event().unwrap(), expected)
233 }
234}