1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::fmt::{self, Display, Formatter};
4
5use crate::proto::{MsgId, StreamMessage as ProtoStreamMessage};
6
7#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub struct MessageID {
14 pub producer_id: u64,
16 pub topic_name: String,
19 pub broker_addr: String,
22 pub topic_offset: u64,
24}
25
26impl Display for MessageID {
27 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
28 write!(
29 f,
30 "topic:_{}_producer:_{}_topic_offset:_{}",
31 self.topic_name, self.producer_id, self.topic_offset,
32 )
33 }
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct StreamMessage {
38 pub request_id: u64,
40 pub msg_id: MessageID,
42 pub payload: Vec<u8>,
44 pub publish_time: u64,
46 pub producer_name: String,
48 pub subscription_name: Option<String>,
51 pub attributes: HashMap<String, String>,
53}
54
55impl StreamMessage {
56 pub fn size(&self) -> usize {
57 self.payload.len()
58 }
59 pub fn add_subscription_name(&mut self, subscription_name: &String) {
60 self.subscription_name = Some(subscription_name.into());
61 }
62}
63
64impl From<MsgId> for MessageID {
65 fn from(proto_msg_id: MsgId) -> Self {
66 MessageID {
67 producer_id: proto_msg_id.producer_id,
68 topic_name: proto_msg_id.topic_name,
69 broker_addr: proto_msg_id.broker_addr,
70 topic_offset: proto_msg_id.topic_offset,
71 }
72 }
73}
74
75impl From<ProtoStreamMessage> for StreamMessage {
76 fn from(proto_stream_msg: ProtoStreamMessage) -> Self {
77 StreamMessage {
78 request_id: proto_stream_msg.request_id,
79 msg_id: proto_stream_msg.msg_id.map_or_else(
80 || panic!("Message ID cannot be None"),
81 |msg_id| msg_id.into(),
82 ),
83 payload: proto_stream_msg.payload,
84 publish_time: proto_stream_msg.publish_time,
85 producer_name: proto_stream_msg.producer_name,
86 subscription_name: Some(proto_stream_msg.subscription_name),
87 attributes: proto_stream_msg.attributes,
88 }
89 }
90}
91
92impl From<MessageID> for MsgId {
93 fn from(msg_id: MessageID) -> Self {
94 MsgId {
95 producer_id: msg_id.producer_id,
96 topic_name: msg_id.topic_name,
97 broker_addr: msg_id.broker_addr,
98 topic_offset: msg_id.topic_offset,
99 }
100 }
101}
102
103impl From<StreamMessage> for ProtoStreamMessage {
104 fn from(stream_msg: StreamMessage) -> Self {
105 ProtoStreamMessage {
106 request_id: stream_msg.request_id,
107 msg_id: Some(stream_msg.msg_id.into()), payload: stream_msg.payload,
109 publish_time: stream_msg.publish_time,
110 producer_name: stream_msg.producer_name,
111 subscription_name: stream_msg.subscription_name.unwrap_or_default(),
112 attributes: stream_msg.attributes,
113 }
114 }
115}