use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{self, Display, Formatter};
use crate::proto::{MsgId, StreamMessage as ProtoStreamMessage};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct MessageID {
pub producer_id: u64,
pub topic_name: String,
pub broker_addr: String,
pub segment_id: u64,
pub segment_offset: u64,
}
impl Display for MessageID {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"topic:_{}_producer:_{}_segment_id:_{}_segment_offset:_{}",
self.topic_name, self.producer_id, self.segment_id, self.segment_offset,
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamMessage {
pub request_id: u64,
pub msg_id: MessageID,
pub payload: Vec<u8>,
pub publish_time: u64,
pub producer_name: String,
pub subscription_name: Option<String>,
pub attributes: HashMap<String, String>,
}
impl StreamMessage {
pub fn size(&self) -> usize {
self.payload.len()
}
pub fn add_subscription_name(&mut self, subscription_name: &String) {
self.subscription_name = Some(subscription_name.into());
}
}
impl From<MsgId> for MessageID {
fn from(proto_msg_id: MsgId) -> Self {
MessageID {
producer_id: proto_msg_id.producer_id,
topic_name: proto_msg_id.topic_name,
broker_addr: proto_msg_id.broker_addr,
segment_id: proto_msg_id.segment_id,
segment_offset: proto_msg_id.segment_offset,
}
}
}
impl From<ProtoStreamMessage> for StreamMessage {
fn from(proto_stream_msg: ProtoStreamMessage) -> Self {
StreamMessage {
request_id: proto_stream_msg.request_id,
msg_id: proto_stream_msg.msg_id.map_or_else(
|| panic!("Message ID cannot be None"),
|msg_id| msg_id.into(),
),
payload: proto_stream_msg.payload,
publish_time: proto_stream_msg.publish_time,
producer_name: proto_stream_msg.producer_name,
subscription_name: Some(proto_stream_msg.subscription_name),
attributes: proto_stream_msg.attributes,
}
}
}
impl From<MessageID> for MsgId {
fn from(msg_id: MessageID) -> Self {
MsgId {
producer_id: msg_id.producer_id,
topic_name: msg_id.topic_name,
broker_addr: msg_id.broker_addr,
segment_id: msg_id.segment_id,
segment_offset: msg_id.segment_offset,
}
}
}
impl From<StreamMessage> for ProtoStreamMessage {
fn from(stream_msg: StreamMessage) -> Self {
ProtoStreamMessage {
request_id: stream_msg.request_id,
msg_id: Some(stream_msg.msg_id.into()), payload: stream_msg.payload,
publish_time: stream_msg.publish_time,
producer_name: stream_msg.producer_name,
subscription_name: stream_msg.subscription_name.unwrap_or_default(),
attributes: stream_msg.attributes,
}
}
}