cloudevents_sdk_rdkafka/
kafka_producer_record.rs

1use super::headers;
2use cloudevents::event::SpecVersion;
3use cloudevents::message::{
4    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
5};
6use cloudevents::Event;
7use rdkafka::message::{OwnedHeaders, ToBytes};
8use rdkafka::producer::{BaseRecord, FutureRecord};
9
10/// This struct contains a serialized CloudEvent message in the Kafka shape.
11/// Implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
12///
13/// To instantiate a new `MessageRecord` from an [`Event`],
14/// look at [`Self::from_event`] or use [`StructuredDeserializer::deserialize_structured`](cloudevents::message::StructuredDeserializer::deserialize_structured)
15/// or [`BinaryDeserializer::deserialize_binary`].
16pub struct MessageRecord {
17    pub(crate) headers: OwnedHeaders,
18    pub(crate) payload: Option<Vec<u8>>,
19}
20
21impl MessageRecord {
22    /// Create a new empty [`MessageRecord`]
23    pub fn new() -> Self {
24        MessageRecord {
25            headers: OwnedHeaders::new(),
26            payload: None,
27        }
28    }
29
30    /// Create a new [`MessageRecord`], filled with `event` serialized in binary mode.
31    pub fn from_event(event: Event) -> Result<Self> {
32        BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
33    }
34}
35
36impl Default for MessageRecord {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl BinarySerializer<MessageRecord> for MessageRecord {
43    fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
44        self.headers = self
45            .headers
46            .add(headers::SPEC_VERSION_HEADER, spec_version.as_str());
47
48        Ok(self)
49    }
50
51    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
52        self.headers = self.headers.add(
53            &headers::ATTRIBUTES_TO_HEADERS
54                .get(name)
55                .ok_or(cloudevents::message::Error::UnknownAttribute {
56                    name: String::from(name),
57                })?
58                .clone()[..],
59            &value.to_string()[..],
60        );
61
62        Ok(self)
63    }
64
65    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
66        self.headers = self
67            .headers
68            .add(&attribute_name_to_header!(name)[..], &value.to_string()[..]);
69
70        Ok(self)
71    }
72
73    fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
74        self.payload = Some(bytes);
75
76        Ok(self)
77    }
78
79    fn end(self) -> Result<MessageRecord> {
80        Ok(self)
81    }
82}
83
84impl StructuredSerializer<MessageRecord> for MessageRecord {
85    fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
86        self.headers = self
87            .headers
88            .add(headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER);
89
90        self.payload = Some(bytes);
91
92        Ok(self)
93    }
94}
95
96/// Extension Trait for [`BaseRecord`] that fills the record with a [`MessageRecord`].
97///
98/// This trait is sealed and cannot be implemented for types outside of this crate.
99pub trait BaseRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
100    /// Fill this [`BaseRecord`] with a [`MessageRecord`].
101    fn message_record(
102        self,
103        message_record: &'a MessageRecord,
104    ) -> Result<BaseRecord<'a, K, Vec<u8>>>;
105}
106
107impl<'a, K: ToBytes + ?Sized> BaseRecordExt<'a, K> for BaseRecord<'a, K, Vec<u8>> {
108    fn message_record(
109        mut self,
110        message_record: &'a MessageRecord,
111    ) -> Result<BaseRecord<'a, K, Vec<u8>>> {
112        self = self.headers(message_record.headers.clone());
113
114        if let Some(s) = message_record.payload.as_ref() {
115            self = self.payload(s);
116        }
117
118        Ok(self)
119    }
120}
121
122/// Extension Trait for [`FutureRecord`] that fills the record with a [`MessageRecord`].
123///
124/// This trait is sealed and cannot be implemented for types outside of this crate.
125pub trait FutureRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
126    /// Fill this [`FutureRecord`] with a [`MessageRecord`].
127    fn message_record(self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>>;
128}
129
130impl<'a, K: ToBytes + ?Sized> FutureRecordExt<'a, K> for FutureRecord<'a, K, Vec<u8>> {
131    fn message_record(mut self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>> {
132        self = self.headers(message_record.headers.clone());
133
134        if let Some(s) = message_record.payload.as_ref() {
135            self = self.payload(s);
136        }
137
138        self
139    }
140}
141
142mod private {
143    // Sealing the FutureRecordExt and BaseRecordExt
144    pub trait Sealed {}
145    impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
146        for rdkafka::producer::FutureRecord<'_, K, V>
147    {
148    }
149    impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
150        for rdkafka::producer::BaseRecord<'_, K, V>
151    {
152    }
153}