cloudevents/binding/rdkafka/
kafka_producer_record.rs1use rdkafka_lib as rdkafka;
2
3use crate::binding::{
4 kafka::{header_prefix, SPEC_VERSION_HEADER},
5 CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE,
6};
7use crate::event::SpecVersion;
8use crate::message::{
9 BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
10};
11use crate::Event;
12use rdkafka::message::{Header, OwnedHeaders, ToBytes};
13use rdkafka::producer::{BaseRecord, FutureRecord};
14
15pub struct MessageRecord {
22 pub(crate) headers: OwnedHeaders,
23 pub(crate) payload: Option<Vec<u8>>,
24}
25
26impl MessageRecord {
27 pub fn new() -> Self {
29 MessageRecord {
30 headers: OwnedHeaders::new(),
31 payload: None,
32 }
33 }
34
35 pub fn from_event(event: Event) -> Result<Self> {
37 BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
38 }
39}
40
41impl Default for MessageRecord {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl BinarySerializer<MessageRecord> for MessageRecord {
48 fn set_spec_version(mut self, sv: SpecVersion) -> Result<Self> {
49 let v = sv.to_string();
50 let header = Header {
51 key: SPEC_VERSION_HEADER,
52 value: Some(&v),
53 };
54 self.headers = self.headers.insert(header);
55 Ok(self)
56 }
57
58 fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
59 let v = value.to_string();
60 let header = Header {
61 key: &header_prefix(name),
62 value: Some(&v),
63 };
64 self.headers = self.headers.insert(header);
65 Ok(self)
66 }
67
68 fn set_extension(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
69 self.set_attribute(name, value)
70 }
71
72 fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
73 self.payload = Some(bytes);
74 Ok(self)
75 }
76
77 fn end(self) -> Result<MessageRecord> {
78 Ok(self)
79 }
80}
81
82impl StructuredSerializer<MessageRecord> for MessageRecord {
83 fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
84 let header = Header {
85 key: CONTENT_TYPE,
86 value: Some(CLOUDEVENTS_JSON_HEADER),
87 };
88 self.headers = self.headers.insert(header);
89
90 self.payload = Some(bytes);
91
92 Ok(self)
93 }
94}
95
96pub trait BaseRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
100 fn message_record(
102 self,
103 message_record: &'a MessageRecord,
104 ) -> Result<BaseRecord<'a, K, Vec<u8>>>;
105}
106
107impl<'a, K: ToBytes + ?Sized> BaseRecordExt<'a, K> for BaseRecord<'a, K, Vec<u8>> {
108 fn message_record(
109 mut self,
110 message_record: &'a MessageRecord,
111 ) -> Result<BaseRecord<'a, K, Vec<u8>>> {
112 self = self.headers(message_record.headers.clone());
113
114 if let Some(s) = message_record.payload.as_ref() {
115 self = self.payload(s);
116 }
117
118 Ok(self)
119 }
120}
121
122pub trait FutureRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
126 fn message_record(self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>>;
128}
129
130impl<'a, K: ToBytes + ?Sized> FutureRecordExt<'a, K> for FutureRecord<'a, K, Vec<u8>> {
131 fn message_record(mut self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>> {
132 self = self.headers(message_record.headers.clone());
133
134 if let Some(s) = message_record.payload.as_ref() {
135 self = self.payload(s);
136 }
137
138 self
139 }
140}
141
142mod private {
143 use rdkafka_lib as rdkafka;
144
145 pub trait Sealed {}
147 impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
148 for rdkafka::producer::FutureRecord<'_, K, V>
149 {
150 }
151 impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
152 for rdkafka::producer::BaseRecord<'_, K, V>
153 {
154 }
155}