cloudevents_sdk_rdkafka/
kafka_producer_record.rs1use super::headers;
2use cloudevents::event::SpecVersion;
3use cloudevents::message::{
4 BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
5};
6use cloudevents::Event;
7use rdkafka::message::{OwnedHeaders, ToBytes};
8use rdkafka::producer::{BaseRecord, FutureRecord};
9
10pub struct MessageRecord {
17 pub(crate) headers: OwnedHeaders,
18 pub(crate) payload: Option<Vec<u8>>,
19}
20
21impl MessageRecord {
22 pub fn new() -> Self {
24 MessageRecord {
25 headers: OwnedHeaders::new(),
26 payload: None,
27 }
28 }
29
30 pub fn from_event(event: Event) -> Result<Self> {
32 BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
33 }
34}
35
36impl Default for MessageRecord {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl BinarySerializer<MessageRecord> for MessageRecord {
43 fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
44 self.headers = self
45 .headers
46 .add(headers::SPEC_VERSION_HEADER, spec_version.as_str());
47
48 Ok(self)
49 }
50
51 fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
52 self.headers = self.headers.add(
53 &headers::ATTRIBUTES_TO_HEADERS
54 .get(name)
55 .ok_or(cloudevents::message::Error::UnknownAttribute {
56 name: String::from(name),
57 })?
58 .clone()[..],
59 &value.to_string()[..],
60 );
61
62 Ok(self)
63 }
64
65 fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
66 self.headers = self
67 .headers
68 .add(&attribute_name_to_header!(name)[..], &value.to_string()[..]);
69
70 Ok(self)
71 }
72
73 fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
74 self.payload = Some(bytes);
75
76 Ok(self)
77 }
78
79 fn end(self) -> Result<MessageRecord> {
80 Ok(self)
81 }
82}
83
84impl StructuredSerializer<MessageRecord> for MessageRecord {
85 fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
86 self.headers = self
87 .headers
88 .add(headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER);
89
90 self.payload = Some(bytes);
91
92 Ok(self)
93 }
94}
95
96pub trait BaseRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
100 fn message_record(
102 self,
103 message_record: &'a MessageRecord,
104 ) -> Result<BaseRecord<'a, K, Vec<u8>>>;
105}
106
107impl<'a, K: ToBytes + ?Sized> BaseRecordExt<'a, K> for BaseRecord<'a, K, Vec<u8>> {
108 fn message_record(
109 mut self,
110 message_record: &'a MessageRecord,
111 ) -> Result<BaseRecord<'a, K, Vec<u8>>> {
112 self = self.headers(message_record.headers.clone());
113
114 if let Some(s) = message_record.payload.as_ref() {
115 self = self.payload(s);
116 }
117
118 Ok(self)
119 }
120}
121
122pub trait FutureRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
126 fn message_record(self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>>;
128}
129
130impl<'a, K: ToBytes + ?Sized> FutureRecordExt<'a, K> for FutureRecord<'a, K, Vec<u8>> {
131 fn message_record(mut self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>> {
132 self = self.headers(message_record.headers.clone());
133
134 if let Some(s) = message_record.payload.as_ref() {
135 self = self.payload(s);
136 }
137
138 self
139 }
140}
141
142mod private {
143 pub trait Sealed {}
145 impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
146 for rdkafka::producer::FutureRecord<'_, K, V>
147 {
148 }
149 impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
150 for rdkafka::producer::BaseRecord<'_, K, V>
151 {
152 }
153}