cloudevents_sdk_rdkafka/
kafka_consumer_record.rs

1use crate::headers;
2use cloudevents::event::SpecVersion;
3use cloudevents::message::{
4    BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
5    Result, StructuredDeserializer, StructuredSerializer,
6};
7use cloudevents::{message, Event};
8use rdkafka::message::{BorrowedMessage, Headers, Message, OwnedMessage};
9use std::collections::HashMap;
10use std::convert::TryFrom;
11use std::str;
12
13/// Wrapper for [`Message`] that implements [`MessageDeserializer`] trait.
14pub struct ConsumerRecordDeserializer {
15    pub(crate) headers: HashMap<String, Vec<u8>>,
16    pub(crate) payload: Option<Vec<u8>>,
17}
18
19impl ConsumerRecordDeserializer {
20    fn get_kafka_headers(message: &impl Message) -> Result<HashMap<String, Vec<u8>>> {
21        let mut hm = HashMap::new();
22        let headers = message
23            .headers()
24            // TODO create an error variant for invalid headers
25            .ok_or(cloudevents::message::Error::WrongEncoding {})?;
26        for i in 0..headers.count() {
27            let header = headers.get(i).unwrap();
28            hm.insert(header.0.to_string(), Vec::from(header.1));
29        }
30        Ok(hm)
31    }
32
33    pub fn new(message: &impl Message) -> Result<ConsumerRecordDeserializer> {
34        Ok(ConsumerRecordDeserializer {
35            headers: Self::get_kafka_headers(message)?,
36            payload: message.payload().map(Vec::from),
37        })
38    }
39}
40
41impl BinaryDeserializer for ConsumerRecordDeserializer {
42    fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
43        if self.encoding() != Encoding::BINARY {
44            return Err(message::Error::WrongEncoding {});
45        }
46
47        let spec_version = SpecVersion::try_from(
48            str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
49                .map_err(|e| cloudevents::message::Error::Other {
50                    source: Box::new(e),
51                })?,
52        )?;
53
54        visitor = visitor.set_spec_version(spec_version.clone())?;
55
56        let attributes = spec_version.attribute_names();
57
58        if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
59            visitor = visitor.set_attribute(
60                "datacontenttype",
61                MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
62                    cloudevents::message::Error::Other {
63                        source: Box::new(e),
64                    }
65                })?),
66            )?
67        }
68
69        for (hn, hv) in self
70            .headers
71            .into_iter()
72            .filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
73        {
74            let name = &hn["ce_".len()..];
75
76            if attributes.contains(&name) {
77                visitor = visitor.set_attribute(
78                    name,
79                    MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
80                        cloudevents::message::Error::Other {
81                            source: Box::new(e),
82                        }
83                    })?),
84                )?
85            } else {
86                visitor = visitor.set_extension(
87                    name,
88                    MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
89                        cloudevents::message::Error::Other {
90                            source: Box::new(e),
91                        }
92                    })?),
93                )?
94            }
95        }
96
97        if self.payload != None {
98            visitor.end_with_data(self.payload.unwrap())
99        } else {
100            visitor.end()
101        }
102    }
103}
104
105impl StructuredDeserializer for ConsumerRecordDeserializer {
106    fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
107        if self.encoding() != Encoding::STRUCTURED {
108            return Err(message::Error::WrongEncoding {});
109        }
110        visitor.set_structured_event(self.payload.unwrap())
111    }
112}
113
114impl MessageDeserializer for ConsumerRecordDeserializer {
115    fn encoding(&self) -> Encoding {
116        match (
117            self.headers
118                .get("content-type")
119                .map(|s| String::from_utf8(s.to_vec()).ok())
120                .flatten()
121                .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
122                .unwrap_or(false),
123            self.headers.get(headers::SPEC_VERSION_HEADER),
124        ) {
125            (true, _) => Encoding::STRUCTURED,
126            (_, Some(_)) => Encoding::BINARY,
127            _ => Encoding::UNKNOWN,
128        }
129    }
130}
131
132/// Method to transform a [`Message`] to [`Event`].
133pub fn record_to_event(msg: &impl Message) -> Result<Event> {
134    MessageDeserializer::into_event(ConsumerRecordDeserializer::new(msg)?)
135}
136
137/// Extension Trait for [`Message`] which acts as a wrapper for the function [`record_to_event()`].
138///
139/// This trait is sealed and cannot be implemented for types outside of this crate.
140pub trait MessageExt: private::Sealed {
141    /// Generates [`Event`] from [`BorrowedMessage`].
142    fn to_event(&self) -> Result<Event>;
143}
144
145impl MessageExt for BorrowedMessage<'_> {
146    fn to_event(&self) -> Result<Event> {
147        record_to_event(self)
148    }
149}
150
151impl MessageExt for OwnedMessage {
152    fn to_event(&self) -> Result<Event> {
153        record_to_event(self)
154    }
155}
156
157mod private {
158    // Sealing the MessageExt
159    pub trait Sealed {}
160    impl Sealed for rdkafka::message::OwnedMessage {}
161    impl Sealed for rdkafka::message::BorrowedMessage<'_> {}
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use crate::kafka_producer_record::MessageRecord;
168
169    use chrono::Utc;
170    use cloudevents::{EventBuilder, EventBuilderV10};
171    use serde_json::json;
172
173    #[test]
174    fn test_binary_record() {
175        let time = Utc::now();
176
177        let expected = EventBuilderV10::new()
178            .id("0001")
179            .ty("example.test")
180            .time(time)
181            .source("http://localhost")
182            .extension("someint", "10")
183            .build()
184            .unwrap();
185
186        // Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into
187        // OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct,
188        // the test uses OwnedMessage instead, which consumes the message instead of borrowing it like
189        // in the case of BorrowedMessage
190
191        let message_record = MessageRecord::from_event(
192            EventBuilderV10::new()
193                .id("0001")
194                .ty("example.test")
195                .time(time)
196                .source("http://localhost")
197                .extension("someint", "10")
198                .build()
199                .unwrap(),
200        )
201        .unwrap();
202
203        let owned_message = OwnedMessage::new(
204            message_record.payload,
205            Some(String::from("test key").into_bytes()),
206            String::from("test topic"),
207            rdkafka::message::Timestamp::NotAvailable,
208            10,
209            10,
210            Some(message_record.headers),
211        );
212
213        assert_eq!(owned_message.to_event().unwrap(), expected)
214    }
215
216    #[test]
217    fn test_structured_record() {
218        let j = json!({"hello": "world"});
219
220        let expected = EventBuilderV10::new()
221            .id("0001")
222            .ty("example.test")
223            .source("http://localhost")
224            .data("application/json", j.clone())
225            .extension("someint", "10")
226            .build()
227            .unwrap();
228
229        // Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into
230        // OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct,
231        // the test uses OwnedMessage instead, which consumes the message instead of borrowing it like
232        // in the case of BorrowedMessage
233
234        let input = EventBuilderV10::new()
235            .id("0001")
236            .ty("example.test")
237            .source("http://localhost")
238            .data("application/json", j.clone())
239            .extension("someint", "10")
240            .build()
241            .unwrap();
242
243        let serialized_event =
244            StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap();
245
246        let owned_message = OwnedMessage::new(
247            serialized_event.payload,
248            Some(String::from("test key").into_bytes()),
249            String::from("test topic"),
250            rdkafka::message::Timestamp::NotAvailable,
251            10,
252            10,
253            Some(serialized_event.headers),
254        );
255
256        assert_eq!(owned_message.to_event().unwrap(), expected)
257    }
258}