1use bytes::Bytes;
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::fmt::{self, Display, Formatter};
5
6use crate::proto::{MsgId, StreamMessage as ProtoStreamMessage};
7
8#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub struct MessageID {
15 pub producer_id: u64,
17 pub topic_name: String,
20 pub broker_addr: String,
23 pub topic_offset: u64,
25}
26
27impl Display for MessageID {
28 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
29 write!(
30 f,
31 "topic:_{}_producer:_{}_topic_offset:_{}",
32 self.topic_name, self.producer_id, self.topic_offset,
33 )
34 }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct StreamMessage {
39 pub request_id: u64,
41 pub msg_id: MessageID,
43 pub payload: Bytes,
45 pub publish_time: u64,
47 pub producer_name: String,
49 pub subscription_name: Option<String>,
52 pub attributes: HashMap<String, String>,
54 pub schema_id: Option<u64>,
56 pub schema_version: Option<u32>,
57 pub routing_key: Option<String>,
59}
60
61impl StreamMessage {
62 pub fn size(&self) -> usize {
63 self.payload.len()
64 }
65 pub fn add_subscription_name(&mut self, subscription_name: &String) {
66 self.subscription_name = Some(subscription_name.into());
67 }
68 pub fn effective_routing_key(&self) -> &str {
71 self.routing_key.as_deref().unwrap_or(&self.producer_name)
72 }
73}
74
75impl From<MsgId> for MessageID {
76 fn from(proto_msg_id: MsgId) -> Self {
77 MessageID {
78 producer_id: proto_msg_id.producer_id,
79 topic_name: proto_msg_id.topic_name,
80 broker_addr: proto_msg_id.broker_addr,
81 topic_offset: proto_msg_id.topic_offset,
82 }
83 }
84}
85
86impl From<ProtoStreamMessage> for StreamMessage {
87 fn from(proto_stream_msg: ProtoStreamMessage) -> Self {
88 StreamMessage {
89 request_id: proto_stream_msg.request_id,
90 msg_id: proto_stream_msg.msg_id.map_or_else(
91 || panic!("Message ID cannot be None"),
92 |msg_id| msg_id.into(),
93 ),
94 payload: proto_stream_msg.payload.into(),
95 publish_time: proto_stream_msg.publish_time,
96 producer_name: proto_stream_msg.producer_name,
97 subscription_name: Some(proto_stream_msg.subscription_name),
98 attributes: proto_stream_msg.attributes,
99 schema_id: proto_stream_msg.schema_id,
100 schema_version: proto_stream_msg.schema_version,
101 routing_key: proto_stream_msg.routing_key,
102 }
103 }
104}
105
106impl From<MessageID> for MsgId {
107 fn from(msg_id: MessageID) -> Self {
108 MsgId {
109 producer_id: msg_id.producer_id,
110 topic_name: msg_id.topic_name,
111 broker_addr: msg_id.broker_addr,
112 topic_offset: msg_id.topic_offset,
113 }
114 }
115}
116
117impl From<StreamMessage> for ProtoStreamMessage {
118 fn from(stream_msg: StreamMessage) -> Self {
119 ProtoStreamMessage {
120 request_id: stream_msg.request_id,
121 msg_id: Some(stream_msg.msg_id.into()), payload: stream_msg.payload.to_vec(),
123 publish_time: stream_msg.publish_time,
124 producer_name: stream_msg.producer_name,
125 subscription_name: stream_msg.subscription_name.unwrap_or_default(),
126 attributes: stream_msg.attributes,
127 schema_id: stream_msg.schema_id,
128 schema_version: stream_msg.schema_version,
129 routing_key: stream_msg.routing_key,
130 }
131 }
132}