1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::fmt::{self, Display, Formatter};
4
5use crate::proto::{MsgId, StreamMessage as ProtoStreamMessage};
6
7#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub struct MessageID {
14 pub producer_id: u64,
16 pub topic_name: String,
19 pub broker_addr: String,
22 pub topic_offset: u64,
24}
25
26impl Display for MessageID {
27 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
28 write!(
29 f,
30 "topic:_{}_producer:_{}_topic_offset:_{}",
31 self.topic_name, self.producer_id, self.topic_offset,
32 )
33 }
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct StreamMessage {
38 pub request_id: u64,
40 pub msg_id: MessageID,
42 pub payload: Vec<u8>,
44 pub publish_time: u64,
46 pub producer_name: String,
48 pub subscription_name: Option<String>,
51 pub attributes: HashMap<String, String>,
53 pub schema_id: Option<u64>,
55 pub schema_version: Option<u32>,
56}
57
58impl StreamMessage {
59 pub fn size(&self) -> usize {
60 self.payload.len()
61 }
62 pub fn add_subscription_name(&mut self, subscription_name: &String) {
63 self.subscription_name = Some(subscription_name.into());
64 }
65}
66
67impl From<MsgId> for MessageID {
68 fn from(proto_msg_id: MsgId) -> Self {
69 MessageID {
70 producer_id: proto_msg_id.producer_id,
71 topic_name: proto_msg_id.topic_name,
72 broker_addr: proto_msg_id.broker_addr,
73 topic_offset: proto_msg_id.topic_offset,
74 }
75 }
76}
77
78impl From<ProtoStreamMessage> for StreamMessage {
79 fn from(proto_stream_msg: ProtoStreamMessage) -> Self {
80 StreamMessage {
81 request_id: proto_stream_msg.request_id,
82 msg_id: proto_stream_msg.msg_id.map_or_else(
83 || panic!("Message ID cannot be None"),
84 |msg_id| msg_id.into(),
85 ),
86 payload: proto_stream_msg.payload,
87 publish_time: proto_stream_msg.publish_time,
88 producer_name: proto_stream_msg.producer_name,
89 subscription_name: Some(proto_stream_msg.subscription_name),
90 attributes: proto_stream_msg.attributes,
91 schema_id: proto_stream_msg.schema_id,
92 schema_version: proto_stream_msg.schema_version,
93 }
94 }
95}
96
97impl From<MessageID> for MsgId {
98 fn from(msg_id: MessageID) -> Self {
99 MsgId {
100 producer_id: msg_id.producer_id,
101 topic_name: msg_id.topic_name,
102 broker_addr: msg_id.broker_addr,
103 topic_offset: msg_id.topic_offset,
104 }
105 }
106}
107
108impl From<StreamMessage> for ProtoStreamMessage {
109 fn from(stream_msg: StreamMessage) -> Self {
110 ProtoStreamMessage {
111 request_id: stream_msg.request_id,
112 msg_id: Some(stream_msg.msg_id.into()), payload: stream_msg.payload,
114 publish_time: stream_msg.publish_time,
115 producer_name: stream_msg.producer_name,
116 subscription_name: stream_msg.subscription_name.unwrap_or_default(),
117 attributes: stream_msg.attributes,
118 schema_id: stream_msg.schema_id,
119 schema_version: stream_msg.schema_version,
120 }
121 }
122}