rocketmq_common/common/message/
message_ext_broker_inner.rs1use std::any::Any;
16use std::collections::HashMap;
17use std::fmt;
18use std::fmt::Debug;
19use std::fmt::Display;
20use std::fmt::Formatter;
21use std::net::SocketAddr;
22
23use bytes::Bytes;
24use cheetah_string::CheetahString;
25use rocketmq_rust::ArcMut;
26
27use crate::common::hasher::string_hasher::JavaStringHasher;
28use crate::common::message::message_ext::MessageExt;
29use crate::common::message::message_single::Message;
30use crate::common::message::MessageTrait;
31use crate::common::message::MessageVersion;
32use crate::common::TopicFilterType;
33use crate::MessageUtils;
34
35#[derive(Default)]
36pub struct MessageExtBrokerInner {
37 pub message_ext_inner: MessageExt,
38 pub properties_string: CheetahString,
39 pub tags_code: i64,
40 pub encoded_buff: Option<bytes::BytesMut>,
41 pub encode_completed: bool,
42 pub version: MessageVersion,
43}
44
45impl MessageExtBrokerInner {
46 const VERSION: MessageVersion = MessageVersion::V1;
47
48 #[inline]
49 pub fn delete_property(&mut self, name: impl Into<CheetahString>) {
50 let name = name.into();
51 self.message_ext_inner.message.clear_property(name.as_str());
52 self.properties_string = CheetahString::from_string(MessageUtils::delete_property(
53 self.properties_string.as_str(),
54 name.as_str(),
55 ));
56 }
57
58 #[inline]
59 pub fn with_version(&mut self, version: MessageVersion) {
60 self.version = version;
61 }
62
63 #[inline]
64 pub fn version(&self) -> MessageVersion {
65 self.version
66 }
67
68 #[inline]
69 pub fn topic(&self) -> &CheetahString {
70 self.message_ext_inner.topic()
71 }
72
73 #[inline]
74 pub fn get_topic(&self) -> &CheetahString {
75 self.message_ext_inner.topic()
76 }
77
78 #[inline]
79 pub fn born_host(&self) -> SocketAddr {
80 self.message_ext_inner.born_host()
81 }
82
83 #[inline]
84 pub fn store_host(&self) -> SocketAddr {
85 self.message_ext_inner.store_host()
86 }
87
88 #[inline]
89 pub fn with_born_host_v6_flag(&mut self) {
90 self.message_ext_inner.with_born_host_v6_flag()
91 }
92
93 #[inline]
94 pub fn with_store_host_v6_flag(&mut self) {
95 self.message_ext_inner.with_store_host_v6_flag()
96 }
97
98 #[inline]
99 pub fn body(&self) -> Option<bytes::Bytes> {
100 self.message_ext_inner.body()
101 }
102
103 #[inline]
104 pub fn sys_flag(&self) -> i32 {
105 self.message_ext_inner.sys_flag()
106 }
107
108 #[inline]
109 pub fn body_crc(&self) -> u32 {
110 self.message_ext_inner.body_crc()
111 }
112
113 #[inline]
114 pub fn queue_id(&self) -> i32 {
115 self.message_ext_inner.queue_id()
116 }
117
118 #[inline]
119 pub fn flag(&self) -> i32 {
120 self.message_ext_inner.flag()
121 }
122
123 #[inline]
124 pub fn born_timestamp(&self) -> i64 {
125 self.message_ext_inner.born_timestamp()
126 }
127
128 #[inline]
129 pub fn store_timestamp(&self) -> i64 {
130 self.message_ext_inner.store_timestamp()
131 }
132
133 #[inline]
134 pub fn born_host_bytes(&self) -> bytes::Bytes {
135 self.message_ext_inner.born_host_bytes()
136 }
137
138 #[inline]
139 pub fn store_host_bytes(&self) -> bytes::Bytes {
140 self.message_ext_inner.born_store_bytes()
141 }
142
143 #[inline]
144 pub fn reconsume_times(&self) -> i32 {
145 self.message_ext_inner.reconsume_times()
146 }
147
148 #[inline]
149 pub fn prepared_transaction_offset(&self) -> i64 {
150 self.message_ext_inner.prepared_transaction_offset()
151 }
152
153 #[inline]
154 pub fn property(&self, name: &str) -> Option<CheetahString> {
155 self.message_ext_inner.properties().get(name).cloned()
156 }
157
158 #[inline]
159 pub fn properties_string(&self) -> &str {
160 self.properties_string.as_str()
161 }
162
163 #[inline]
164 pub fn queue_offset(&self) -> i64 {
165 self.message_ext_inner.queue_offset()
166 }
167
168 #[inline]
169 pub fn tags_string2tags_code(_filter: &TopicFilterType, tags: &str) -> i64 {
170 if tags.is_empty() {
171 return 0;
172 }
173 JavaStringHasher::hash_str(tags) as i64
174 }
175
176 #[inline]
177 pub fn tags_string_to_tags_code(tags: &str) -> i64 {
178 if tags.is_empty() {
179 return 0;
180 }
181 JavaStringHasher::hash_str(tags) as i64
182 }
183
184 #[inline]
185 pub fn get_tags(&self) -> Option<CheetahString> {
186 self.message_ext_inner.get_tags()
187 }
188
189 #[inline]
190 pub fn get_transaction_id(&self) -> Option<&CheetahString> {
191 self.message_ext_inner.get_transaction_id()
192 }
193
194 #[inline]
195 pub fn is_wait_store_msg_ok(&self) -> bool {
196 self.message_ext_inner.message.is_wait_store_msg_ok()
197 }
198
199 #[inline]
200 pub fn body_len(&self) -> usize {
201 self.message_ext_inner.message.get_body().unwrap().len()
202 }
203}
204
205impl fmt::Display for MessageExtBrokerInner {
206 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207 let encoded_buff_str = match &self.encoded_buff {
208 Some(encoded_buff) =>
209 {
211 "****".to_string()
212 }
213 None => "None".to_string(),
214 };
215
216 write!(
217 f,
218 "MessageExtBrokerInner {{ message_ext_inner: {}, properties_string: {}, tags_code: {}, encoded_buff: {}, \
219 encode_completed: {}, version: {} }}",
220 self.message_ext_inner,
221 self.properties_string,
222 self.tags_code,
223 encoded_buff_str,
224 self.encode_completed,
225 self.version
226 )
227 }
228}
229
230impl Debug for MessageExtBrokerInner {
231 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
232 let encoded_buff_str = match &self.encoded_buff {
233 Some(encoded_buff) =>
234 {
236 "****".to_string()
237 }
238 None => "None".to_string(),
239 };
240
241 write!(
242 f,
243 "MessageExtBrokerInner {{ message_ext_inner: {:?}, properties_string: {}, tags_code: {}, encoded_buff: \
244 {}, encode_completed: {}, version: {} }}",
245 self.message_ext_inner,
246 self.properties_string,
247 self.tags_code,
248 encoded_buff_str,
249 self.encode_completed,
250 self.version
251 )
252 }
253}
254
255impl MessageTrait for MessageExtBrokerInner {
256 #[inline]
257 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
258 self.message_ext_inner.put_property(key, value);
259 }
260
261 #[inline]
262 fn clear_property(&mut self, name: &str) {
263 self.message_ext_inner.clear_property(name);
264 }
265
266 #[inline]
267 fn property(&self, name: &CheetahString) -> Option<CheetahString> {
268 self.message_ext_inner.property(name)
269 }
270
271 fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
272 self.message_ext_inner.property_ref(name)
273 }
274
275 #[inline]
276 fn topic(&self) -> &CheetahString {
277 self.message_ext_inner.topic()
278 }
279
280 #[inline]
281 fn set_topic(&mut self, topic: CheetahString) {
282 self.message_ext_inner.set_topic(topic);
283 }
284
285 #[inline]
286 fn get_flag(&self) -> i32 {
287 self.message_ext_inner.get_flag()
288 }
289
290 #[inline]
291 fn set_flag(&mut self, flag: i32) {
292 self.message_ext_inner.set_flag(flag);
293 }
294
295 #[inline]
296 fn get_body(&self) -> Option<&Bytes> {
297 self.message_ext_inner.get_body()
298 }
299
300 #[inline]
301 fn set_body(&mut self, body: Bytes) {
302 self.message_ext_inner.set_body(body);
303 }
304
305 #[inline]
306 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
307 self.message_ext_inner.get_properties()
308 }
309
310 #[inline]
311 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
312 self.message_ext_inner.set_properties(properties);
313 }
314
315 #[inline]
316 fn transaction_id(&self) -> Option<&CheetahString> {
317 self.message_ext_inner.transaction_id()
318 }
319
320 #[inline]
321 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
322 self.message_ext_inner.set_transaction_id(transaction_id);
323 }
324
325 #[inline]
326 fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes> {
327 self.message_ext_inner.get_compressed_body_mut()
328 }
329
330 #[inline]
331 fn get_compressed_body(&self) -> Option<&Bytes> {
332 self.message_ext_inner.get_compressed_body()
333 }
334
335 #[inline]
336 fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
337 self.message_ext_inner.set_compressed_body_mut(compressed_body);
338 }
339
340 #[inline]
341 fn take_body(&mut self) -> Option<Bytes> {
342 self.message_ext_inner.take_body()
343 }
344
345 #[inline]
346 fn as_any(&self) -> &dyn Any {
347 self
348 }
349
350 #[inline]
351 fn as_any_mut(&mut self) -> &mut dyn Any {
352 self
353 }
354}
355
356impl From<MessageExtBrokerInner> for crate::common::message::broker_message::BrokerMessage {
358 fn from(inner: MessageExtBrokerInner) -> Self {
359 use crate::common::message::broker_message::BrokerMessage;
360 use crate::common::message::message_envelope::MessageEnvelope;
361
362 let envelope = MessageEnvelope::from(inner.message_ext_inner);
363
364 BrokerMessage::new(envelope, inner.tags_code, inner.properties_string, inner.version)
365 }
366}
367
368impl From<crate::common::message::broker_message::BrokerMessage> for MessageExtBrokerInner {
370 fn from(broker_msg: crate::common::message::broker_message::BrokerMessage) -> Self {
371 use crate::common::message::message_ext::MessageExt;
372
373 let message_ext_inner = MessageExt::from(broker_msg.envelope().clone());
374
375 Self {
376 message_ext_inner,
377 properties_string: CheetahString::from_string(broker_msg.properties_string().to_string()),
378 tags_code: broker_msg.tags_code(),
379 encoded_buff: broker_msg.encoded_buff().cloned(),
380 encode_completed: broker_msg.is_encode_completed(),
381 version: broker_msg.version(),
382 }
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::MessageExtBrokerInner;
389 use crate::common::message::MessageTrait;
390 use cheetah_string::CheetahString;
391
392 #[test]
393 fn message_ext_broker_inner_get_transaction_id_delegates_to_inner_message() {
394 let mut message = MessageExtBrokerInner::default();
395 MessageTrait::set_transaction_id(&mut message, CheetahString::from("tx-123"));
396
397 assert_eq!(message.get_transaction_id(), Some(&CheetahString::from("tx-123")));
398 }
399}