cloudevents/binding/rdkafka/
kafka_producer_record.rs

1use rdkafka_lib as rdkafka;
2
3use crate::binding::{
4    kafka::{header_prefix, SPEC_VERSION_HEADER},
5    CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE,
6};
7use crate::event::SpecVersion;
8use crate::message::{
9    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
10};
11use crate::Event;
12use rdkafka::message::{Header, OwnedHeaders, ToBytes};
13use rdkafka::producer::{BaseRecord, FutureRecord};
14
15/// This struct contains a serialized CloudEvent message in the Kafka shape.
16/// Implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
17///
18/// To instantiate a new `MessageRecord` from an [`Event`],
19/// look at [`Self::from_event`] or use [`StructuredDeserializer::deserialize_structured`](crate::message::StructuredDeserializer::deserialize_structured)
20/// or [`BinaryDeserializer::deserialize_binary`].
21pub struct MessageRecord {
22    pub(crate) headers: OwnedHeaders,
23    pub(crate) payload: Option<Vec<u8>>,
24}
25
26impl MessageRecord {
27    /// Create a new empty [`MessageRecord`]
28    pub fn new() -> Self {
29        MessageRecord {
30            headers: OwnedHeaders::new(),
31            payload: None,
32        }
33    }
34
35    /// Create a new [`MessageRecord`], filled with `event` serialized in binary mode.
36    pub fn from_event(event: Event) -> Result<Self> {
37        BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
38    }
39}
40
41impl Default for MessageRecord {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl BinarySerializer<MessageRecord> for MessageRecord {
48    fn set_spec_version(mut self, sv: SpecVersion) -> Result<Self> {
49        let v = sv.to_string();
50        let header = Header {
51            key: SPEC_VERSION_HEADER,
52            value: Some(&v),
53        };
54        self.headers = self.headers.insert(header);
55        Ok(self)
56    }
57
58    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
59        let v = value.to_string();
60        let header = Header {
61            key: &header_prefix(name),
62            value: Some(&v),
63        };
64        self.headers = self.headers.insert(header);
65        Ok(self)
66    }
67
68    fn set_extension(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
69        self.set_attribute(name, value)
70    }
71
72    fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
73        self.payload = Some(bytes);
74        Ok(self)
75    }
76
77    fn end(self) -> Result<MessageRecord> {
78        Ok(self)
79    }
80}
81
82impl StructuredSerializer<MessageRecord> for MessageRecord {
83    fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
84        let header = Header {
85            key: CONTENT_TYPE,
86            value: Some(CLOUDEVENTS_JSON_HEADER),
87        };
88        self.headers = self.headers.insert(header);
89
90        self.payload = Some(bytes);
91
92        Ok(self)
93    }
94}
95
96/// Extension Trait for [`BaseRecord`] that fills the record with a [`MessageRecord`].
97///
98/// This trait is sealed and cannot be implemented for types outside of this crate.
99pub trait BaseRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
100    /// Fill this [`BaseRecord`] with a [`MessageRecord`].
101    fn message_record(
102        self,
103        message_record: &'a MessageRecord,
104    ) -> Result<BaseRecord<'a, K, Vec<u8>>>;
105}
106
107impl<'a, K: ToBytes + ?Sized> BaseRecordExt<'a, K> for BaseRecord<'a, K, Vec<u8>> {
108    fn message_record(
109        mut self,
110        message_record: &'a MessageRecord,
111    ) -> Result<BaseRecord<'a, K, Vec<u8>>> {
112        self = self.headers(message_record.headers.clone());
113
114        if let Some(s) = message_record.payload.as_ref() {
115            self = self.payload(s);
116        }
117
118        Ok(self)
119    }
120}
121
122/// Extension Trait for [`FutureRecord`] that fills the record with a [`MessageRecord`].
123///
124/// This trait is sealed and cannot be implemented for types outside of this crate.
125pub trait FutureRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
126    /// Fill this [`FutureRecord`] with a [`MessageRecord`].
127    fn message_record(self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>>;
128}
129
130impl<'a, K: ToBytes + ?Sized> FutureRecordExt<'a, K> for FutureRecord<'a, K, Vec<u8>> {
131    fn message_record(mut self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>> {
132        self = self.headers(message_record.headers.clone());
133
134        if let Some(s) = message_record.payload.as_ref() {
135            self = self.payload(s);
136        }
137
138        self
139    }
140}
141
142mod private {
143    use rdkafka_lib as rdkafka;
144
145    // Sealing the FutureRecordExt and BaseRecordExt
146    pub trait Sealed {}
147    impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
148        for rdkafka::producer::FutureRecord<'_, K, V>
149    {
150    }
151    impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
152        for rdkafka::producer::BaseRecord<'_, K, V>
153    {
154    }
155}