cloudevents/binding/rdkafka/
kafka_consumer_record.rs

1use rdkafka_lib as rdkafka;
2
3use crate::binding::{kafka::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE};
4use crate::event::SpecVersion;
5use crate::message::{
6    BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
7    Result, StructuredDeserializer, StructuredSerializer,
8};
9use crate::{message, Event};
10use rdkafka::message::{BorrowedMessage, Headers, Message, OwnedMessage};
11use std::collections::HashMap;
12use std::convert::TryFrom;
13use std::str;
14
15/// Wrapper for [`Message`] that implements [`MessageDeserializer`] trait.
16pub struct ConsumerRecordDeserializer {
17    pub(crate) headers: HashMap<String, Vec<u8>>,
18    pub(crate) payload: Option<Vec<u8>>,
19}
20
21impl ConsumerRecordDeserializer {
22    fn get_kafka_headers(message: &impl Message) -> Result<HashMap<String, Vec<u8>>> {
23        match message.headers() {
24            None => Err(crate::message::Error::WrongEncoding {}),
25            Some(headers) => Ok(headers
26                .iter()
27                .map(|h| (h.key.to_string(), Vec::from(h.value.unwrap())))
28                .collect()),
29        }
30    }
31
32    pub fn new(message: &impl Message) -> Result<ConsumerRecordDeserializer> {
33        Ok(ConsumerRecordDeserializer {
34            headers: Self::get_kafka_headers(message)?,
35            payload: message.payload().map(Vec::from),
36        })
37    }
38}
39
40impl BinaryDeserializer for ConsumerRecordDeserializer {
41    fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
42        if self.encoding() != Encoding::BINARY {
43            return Err(message::Error::WrongEncoding {});
44        }
45
46        let spec_version = SpecVersion::try_from(
47            str::from_utf8(&self.headers.remove(SPEC_VERSION_HEADER).unwrap()).map_err(|e| {
48                crate::message::Error::Other {
49                    source: Box::new(e),
50                }
51            })?,
52        )?;
53
54        let attributes = spec_version.attribute_names();
55
56        visitor = visitor.set_spec_version(spec_version)?;
57
58        if let Some(hv) = self.headers.remove(CONTENT_TYPE) {
59            visitor = visitor.set_attribute(
60                "datacontenttype",
61                MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
62                    crate::message::Error::Other {
63                        source: Box::new(e),
64                    }
65                })?),
66            )?
67        }
68
69        for (hn, hv) in self
70            .headers
71            .into_iter()
72            .filter(|(hn, _)| SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
73        {
74            let name = &hn["ce_".len()..];
75
76            if attributes.contains(&name) {
77                visitor = visitor.set_attribute(
78                    name,
79                    MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
80                        crate::message::Error::Other {
81                            source: Box::new(e),
82                        }
83                    })?),
84                )?
85            } else {
86                visitor = visitor.set_extension(
87                    name,
88                    MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
89                        crate::message::Error::Other {
90                            source: Box::new(e),
91                        }
92                    })?),
93                )?
94            }
95        }
96
97        if self.payload.is_some() {
98            visitor.end_with_data(self.payload.unwrap())
99        } else {
100            visitor.end()
101        }
102    }
103}
104
105impl StructuredDeserializer for ConsumerRecordDeserializer {
106    fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
107        if self.encoding() != Encoding::STRUCTURED {
108            return Err(message::Error::WrongEncoding {});
109        }
110        visitor.set_structured_event(self.payload.unwrap())
111    }
112}
113
114impl MessageDeserializer for ConsumerRecordDeserializer {
115    fn encoding(&self) -> Encoding {
116        match (
117            self.headers
118                .get("content-type")
119                .and_then(|s| String::from_utf8(s.to_vec()).ok())
120                .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
121                .unwrap_or(false),
122            self.headers.get(SPEC_VERSION_HEADER),
123        ) {
124            (true, _) => Encoding::STRUCTURED,
125            (_, Some(_)) => Encoding::BINARY,
126            _ => Encoding::UNKNOWN,
127        }
128    }
129}
130
131/// Method to transform a [`Message`] to [`Event`].
132pub fn record_to_event(msg: &impl Message) -> Result<Event> {
133    MessageDeserializer::into_event(ConsumerRecordDeserializer::new(msg)?)
134}
135
136/// Extension Trait for [`Message`] which acts as a wrapper for the function [`record_to_event()`].
137///
138/// This trait is sealed and cannot be implemented for types outside of this crate.
139pub trait MessageExt: private::Sealed {
140    /// Generates [`Event`] from [`BorrowedMessage`].
141    fn to_event(&self) -> Result<Event>;
142}
143
144impl MessageExt for BorrowedMessage<'_> {
145    fn to_event(&self) -> Result<Event> {
146        record_to_event(self)
147    }
148}
149
150impl MessageExt for OwnedMessage {
151    fn to_event(&self) -> Result<Event> {
152        record_to_event(self)
153    }
154}
155
156mod private {
157    use rdkafka_lib as rdkafka;
158
159    // Sealing the MessageExt
160    pub trait Sealed {}
161    impl Sealed for rdkafka::message::OwnedMessage {}
162    impl Sealed for rdkafka::message::BorrowedMessage<'_> {}
163}
164
165#[cfg(test)]
166mod tests {
167    use rdkafka_lib as rdkafka;
168
169    use super::*;
170    use crate::binding::rdkafka::kafka_producer_record::MessageRecord;
171
172    use crate::test::fixtures;
173    use crate::{EventBuilder, EventBuilderV10};
174
175    #[test]
176    fn test_binary_record() {
177        let expected = fixtures::v10::minimal_string_extension();
178
179        // Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into
180        // OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct,
181        // the test uses OwnedMessage instead, which consumes the message instead of borrowing it like
182        // in the case of BorrowedMessage
183
184        let message_record = MessageRecord::from_event(
185            EventBuilderV10::new()
186                .id("0001")
187                .ty("test_event.test_application")
188                .source("http://localhost/")
189                .extension("someint", "10")
190                .build()
191                .unwrap(),
192        )
193        .unwrap();
194
195        let owned_message = OwnedMessage::new(
196            message_record.payload,
197            Some(String::from("test key").into_bytes()),
198            String::from("test topic"),
199            rdkafka::message::Timestamp::NotAvailable,
200            10,
201            10,
202            Some(message_record.headers),
203        );
204
205        assert_eq!(owned_message.to_event().unwrap(), expected)
206    }
207
208    #[test]
209    fn test_structured_record() {
210        let expected = fixtures::v10::full_json_data_string_extension();
211
212        // Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into
213        // OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct,
214        // the test uses OwnedMessage instead, which consumes the message instead of borrowing it like
215        // in the case of BorrowedMessage
216
217        let input = expected.clone();
218
219        let serialized_event =
220            StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap();
221
222        let owned_message = OwnedMessage::new(
223            serialized_event.payload,
224            Some(String::from("test key").into_bytes()),
225            String::from("test topic"),
226            rdkafka::message::Timestamp::NotAvailable,
227            10,
228            10,
229            Some(serialized_event.headers),
230        );
231
232        assert_eq!(owned_message.to_event().unwrap(), expected)
233    }
234}