use rdkafka_lib as rdkafka;
use super::headers;
use crate::event::SpecVersion;
use crate::message::{
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
};
use crate::Event;
use rdkafka::message::{OwnedHeaders, ToBytes};
use rdkafka::producer::{BaseRecord, FutureRecord};
pub struct MessageRecord {
pub(crate) headers: OwnedHeaders,
pub(crate) payload: Option<Vec<u8>>,
}
impl MessageRecord {
pub fn new() -> Self {
MessageRecord {
headers: OwnedHeaders::new(),
payload: None,
}
}
pub fn from_event(event: Event) -> Result<Self> {
BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
}
}
impl Default for MessageRecord {
fn default() -> Self {
Self::new()
}
}
impl BinarySerializer<MessageRecord> for MessageRecord {
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
self.headers = self
.headers
.add(headers::SPEC_VERSION_HEADER, spec_version.as_str());
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.headers = self.headers.add(
&headers::ATTRIBUTES_TO_HEADERS
.get(name)
.ok_or(crate::message::Error::UnknownAttribute {
name: String::from(name),
})?
.clone()[..],
&value.to_string()[..],
);
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.headers = self
.headers
.add(&attribute_name_to_header!(name)[..], &value.to_string()[..]);
Ok(self)
}
fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
self.payload = Some(bytes);
Ok(self)
}
fn end(self) -> Result<MessageRecord> {
Ok(self)
}
}
impl StructuredSerializer<MessageRecord> for MessageRecord {
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
self.headers = self
.headers
.add(headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER);
self.payload = Some(bytes);
Ok(self)
}
}
pub trait BaseRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
fn message_record(
self,
message_record: &'a MessageRecord,
) -> Result<BaseRecord<'a, K, Vec<u8>>>;
}
impl<'a, K: ToBytes + ?Sized> BaseRecordExt<'a, K> for BaseRecord<'a, K, Vec<u8>> {
fn message_record(
mut self,
message_record: &'a MessageRecord,
) -> Result<BaseRecord<'a, K, Vec<u8>>> {
self = self.headers(message_record.headers.clone());
if let Some(s) = message_record.payload.as_ref() {
self = self.payload(s);
}
Ok(self)
}
}
pub trait FutureRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
fn message_record(self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>>;
}
impl<'a, K: ToBytes + ?Sized> FutureRecordExt<'a, K> for FutureRecord<'a, K, Vec<u8>> {
fn message_record(mut self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>> {
self = self.headers(message_record.headers.clone());
if let Some(s) = message_record.payload.as_ref() {
self = self.payload(s);
}
self
}
}
mod private {
use rdkafka_lib as rdkafka;
pub trait Sealed {}
impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
for rdkafka::producer::FutureRecord<'_, K, V>
{
}
impl<K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed
for rdkafka::producer::BaseRecord<'_, K, V>
{
}
}