use crate::serde_utils::{bytes_serde, option_bytes_serde};
use crate::transaction::TransactionMarker;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub offset: u64,
#[serde(with = "option_bytes_serde")]
pub key: Option<Bytes>,
#[serde(with = "bytes_serde")]
pub value: Bytes,
pub timestamp: DateTime<Utc>,
pub headers: Vec<(String, Vec<u8>)>,
#[serde(default)]
pub producer_id: Option<u64>,
#[serde(default)]
pub producer_epoch: Option<u16>,
#[serde(default)]
pub transaction_marker: Option<TransactionMarker>,
#[serde(default)]
pub is_transactional: bool,
}
impl Message {
pub fn new(value: Bytes) -> Self {
Self {
offset: 0,
key: None,
value,
timestamp: Utc::now(),
headers: Vec::new(),
producer_id: None,
producer_epoch: None,
transaction_marker: None,
is_transactional: false,
}
}
pub fn with_key(key: Bytes, value: Bytes) -> Self {
Self {
offset: 0,
key: Some(key),
value,
timestamp: Utc::now(),
headers: Vec::new(),
producer_id: None,
producer_epoch: None,
transaction_marker: None,
is_transactional: false,
}
}
pub fn transactional(value: Bytes, producer_id: u64, producer_epoch: u16) -> Self {
Self {
offset: 0,
key: None,
value,
timestamp: Utc::now(),
headers: Vec::new(),
producer_id: Some(producer_id),
producer_epoch: Some(producer_epoch),
transaction_marker: None,
is_transactional: true,
}
}
pub fn transactional_with_key(
key: Bytes,
value: Bytes,
producer_id: u64,
producer_epoch: u16,
) -> Self {
Self {
offset: 0,
key: Some(key),
value,
timestamp: Utc::now(),
headers: Vec::new(),
producer_id: Some(producer_id),
producer_epoch: Some(producer_epoch),
transaction_marker: None,
is_transactional: true,
}
}
pub fn control_record(
marker: TransactionMarker,
producer_id: u64,
producer_epoch: u16,
) -> Self {
Self {
offset: 0,
key: None,
value: Bytes::new(), timestamp: Utc::now(),
headers: Vec::new(),
producer_id: Some(producer_id),
producer_epoch: Some(producer_epoch),
transaction_marker: Some(marker),
is_transactional: true,
}
}
pub fn is_control_record(&self) -> bool {
self.transaction_marker.is_some()
}
pub fn is_committed(&self) -> bool {
!self.is_transactional || matches!(self.transaction_marker, Some(TransactionMarker::Commit))
}
pub fn add_header(mut self, key: String, value: Vec<u8>) -> Self {
self.headers.push((key, value));
self
}
pub fn with_producer(
mut self,
producer_id: u64,
producer_epoch: u16,
transactional: bool,
) -> Self {
self.producer_id = Some(producer_id);
self.producer_epoch = Some(producer_epoch);
self.is_transactional = transactional;
self
}
pub fn to_bytes(&self) -> crate::Result<Vec<u8>> {
Ok(postcard::to_allocvec(self)?)
}
pub fn from_bytes(data: &[u8]) -> crate::Result<Self> {
Ok(postcard::from_bytes(data)?)
}
}