rocketmq_common/common/message/
message_single.rs1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt;
20use std::fmt::Display;
21use std::fmt::Formatter;
22use std::hash::DefaultHasher;
23use std::hash::Hash;
24use std::hash::Hasher;
25use std::net::SocketAddr;
26
27use bytes::Buf;
28use bytes::BufMut;
29use bytes::Bytes;
30use cheetah_string::CheetahString;
31
32use crate::common::hasher::string_hasher::JavaStringHasher;
33use crate::common::message::message_ext::MessageExt;
34use crate::common::message::MessageConst;
35use crate::common::message::MessageTrait;
36use crate::common::message::MessageVersion;
37use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
38use crate::common::TopicFilterType;
39use crate::MessageUtils;
40
41#[derive(Clone, Debug)]
42pub struct Message {
43 pub topic: CheetahString,
44 pub flag: i32,
45 pub properties: HashMap<CheetahString, CheetahString>,
46 pub body: Option<bytes::Bytes>,
48 pub compressed_body: Option<bytes::Bytes>,
50 pub transaction_id: Option<CheetahString>,
51}
52
53impl Default for Message {
54 fn default() -> Self {
55 Self {
56 topic: CheetahString::new(),
57 flag: 0,
58 properties: HashMap::new(),
59 body: None,
60 compressed_body: None,
61 transaction_id: None,
62 }
63 }
64}
65
66impl Message {
67 pub fn new(topic: impl Into<CheetahString>, body: &[u8]) -> Self {
68 Self::with_details(
69 topic,
70 CheetahString::new(),
71 CheetahString::new(),
72 0,
73 body,
74 true,
75 )
76 }
77
78 pub fn new_body(topic: impl Into<CheetahString>, body: Option<Bytes>) -> Self {
79 Self::with_details_body(
80 topic,
81 CheetahString::new(),
82 CheetahString::new(),
83 0,
84 body,
85 true,
86 )
87 }
88
89 pub fn with_tags(
90 topic: impl Into<CheetahString>,
91 tags: impl Into<CheetahString>,
92 body: &[u8],
93 ) -> Self {
94 Self::with_details(topic, tags, String::new(), 0, body, true)
95 }
96
97 pub fn with_keys(
98 topic: impl Into<CheetahString>,
99 tags: impl Into<CheetahString>,
100 keys: impl Into<CheetahString>,
101 body: &[u8],
102 ) -> Self {
103 Self::with_details(topic, tags, keys, 0, body, true)
104 }
105
106 pub fn with_details(
107 topic: impl Into<CheetahString>,
108 tags: impl Into<CheetahString>,
109 keys: impl Into<CheetahString>,
110 flag: i32,
111 body: &[u8],
112 wait_store_msg_ok: bool,
113 ) -> Self {
114 let topic = topic.into();
115 let tags = tags.into();
116 let keys = keys.into();
117 let mut message = Message {
118 topic,
119 flag,
120 body: Some(bytes::Bytes::copy_from_slice(body)),
121 ..Default::default()
122 };
123
124 if !tags.is_empty() {
125 message.set_tags(tags);
126 }
127
128 if !keys.is_empty() {
129 message.set_keys(keys);
130 }
131
132 message.set_wait_store_msg_ok(wait_store_msg_ok);
133 message
134 }
135
136 pub fn with_details_body(
137 topic: impl Into<CheetahString>,
138 tags: impl Into<CheetahString>,
139 keys: impl Into<CheetahString>,
140 flag: i32,
141 body: Option<Bytes>,
142 wait_store_msg_ok: bool,
143 ) -> Self {
144 let topic = topic.into();
145 let tags = tags.into();
146 let keys = keys.into();
147 let mut message = Message {
148 topic,
149 flag,
150 body,
151 ..Default::default()
152 };
153
154 if !tags.is_empty() {
155 message.set_tags(tags);
156 }
157
158 if !keys.is_empty() {
159 message.set_keys(keys);
160 }
161
162 message.set_wait_store_msg_ok(wait_store_msg_ok);
163 message
164 }
165
166 #[inline]
167 pub fn set_tags(&mut self, tags: CheetahString) {
168 self.properties.insert(
169 CheetahString::from_static_str(MessageConst::PROPERTY_TAGS),
170 tags,
171 );
172 }
173
174 #[inline]
175 pub fn set_keys(&mut self, keys: CheetahString) {
176 self.properties.insert(
177 CheetahString::from_static_str(MessageConst::PROPERTY_KEYS),
178 keys,
179 );
180 }
181
182 #[inline]
183 pub fn clear_property(&mut self, name: impl Into<CheetahString>) {
184 self.properties.remove(name.into().as_str());
185 }
186
187 #[inline]
188 pub fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
189 self.properties = properties;
190 }
191
192 #[inline]
193 pub fn get_property(&self, key: &CheetahString) -> Option<CheetahString> {
194 self.properties.get(key).cloned()
195 }
196
197 #[inline]
198 pub fn body(&self) -> Option<bytes::Bytes> {
199 self.body.as_ref().cloned()
200 }
201
202 #[inline]
203 pub fn flag(&self) -> i32 {
204 self.flag
205 }
206
207 #[inline]
208 pub fn topic(&self) -> &CheetahString {
209 &self.topic
210 }
211
212 #[inline]
213 pub fn properties(&self) -> &HashMap<CheetahString, CheetahString> {
214 &self.properties
215 }
216
217 #[inline]
218 pub fn transaction_id(&self) -> Option<&str> {
219 self.transaction_id.as_deref()
220 }
221
222 #[inline]
223 pub fn get_tags(&self) -> Option<CheetahString> {
224 self.get_property(&CheetahString::from_static_str(MessageConst::PROPERTY_TAGS))
225 }
226
227 #[inline]
228 pub fn is_wait_store_msg_ok(&self) -> bool {
229 match self.get_property(&CheetahString::from_static_str(
230 MessageConst::PROPERTY_WAIT_STORE_MSG_OK,
231 )) {
232 None => true,
233 Some(value) => value.parse().unwrap_or(true),
234 }
235 }
236
237 #[inline]
238 pub fn get_delay_time_level(&self) -> i32 {
239 match self.properties.get(MessageConst::PROPERTY_DELAY_TIME_LEVEL) {
240 Some(t) => t.parse::<i32>().unwrap_or(0),
241 None => 0,
242 }
243 }
244
245 #[inline]
246 pub fn set_delay_time_level(&mut self, level: i32) {
247 self.properties.insert(
248 CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL),
249 CheetahString::from(level.to_string()),
250 );
251 }
252
253 #[inline]
254 pub fn get_user_property(&self, name: impl Into<CheetahString>) -> Option<CheetahString> {
255 self.properties.get(name.into().as_str()).cloned()
256 }
257
258 #[inline]
259 pub fn as_any(&self) -> &dyn Any {
260 self
261 }
262
263 #[inline]
264 pub fn set_instance_id(&mut self, instance_id: impl Into<CheetahString>) {
265 self.properties.insert(
266 CheetahString::from_static_str(MessageConst::PROPERTY_INSTANCE_ID),
267 instance_id.into(),
268 );
269 }
270}
271
272impl Display for Message {
273 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274 let properties_str = self
275 .properties
276 .iter()
277 .map(|(k, v)| format!("{k}: {v}"))
278 .collect::<Vec<_>>()
279 .join(", ");
280
281 let body_str = match &self.body {
282 Some(body) => format!("Some({body:?})"),
283 None => "None".to_string(),
284 };
285
286 let compressed_body_str = match &self.compressed_body {
287 Some(compressed_body) => format!("Some({compressed_body:?})"),
288 None => "None".to_string(),
289 };
290
291 let transaction_id_str = match &self.transaction_id {
292 Some(transaction_id) => transaction_id.to_string(),
293 None => "None".to_string(),
294 };
295
296 write!(
297 f,
298 "Message {{ topic: {}, flag: {}, properties: {{ {} }}, body: {}, compressed_body: {}, \
299 transaction_id: {} }}",
300 self.topic,
301 self.flag,
302 properties_str,
303 body_str,
304 compressed_body_str,
305 transaction_id_str
306 )
307 }
308}
309
310#[allow(unused_variables)]
311impl MessageTrait for Message {
312 #[inline]
313 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
314 self.properties.insert(key, value);
315 }
316
317 #[inline]
318 fn clear_property(&mut self, name: &str) {
319 self.properties.remove(name);
320 }
321
322 #[inline]
323 fn get_property(&self, name: &CheetahString) -> Option<CheetahString> {
324 self.properties.get(name).cloned()
325 }
326
327 #[inline]
328 fn get_topic(&self) -> &CheetahString {
329 &self.topic
330 }
331
332 #[inline]
333 fn set_topic(&mut self, topic: CheetahString) {
334 self.topic = topic;
335 }
336
337 #[inline]
338 fn get_flag(&self) -> i32 {
339 self.flag
340 }
341
342 #[inline]
343 fn set_flag(&mut self, flag: i32) {
344 self.flag = flag;
345 }
346
347 #[inline]
348 fn get_body(&self) -> Option<&Bytes> {
349 self.body.as_ref()
350 }
351
352 #[inline]
353 fn set_body(&mut self, body: Bytes) {
354 self.body = Some(body);
355 }
356
357 #[inline]
358 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
359 &self.properties
360 }
361
362 #[inline]
363 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
364 self.properties = properties;
365 }
366
367 #[inline]
368 fn get_transaction_id(&self) -> Option<&CheetahString> {
369 self.transaction_id.as_ref()
370 }
371
372 #[inline]
373 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
374 self.transaction_id = Some(transaction_id);
375 }
376
377 #[inline]
378 fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes> {
379 &mut self.compressed_body
380 }
381
382 #[inline]
383 fn get_compressed_body(&self) -> Option<&Bytes> {
384 self.compressed_body.as_ref()
385 }
386
387 #[inline]
388 fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
389 self.compressed_body = Some(compressed_body);
390 }
391
392 #[inline]
393 fn take_body(&mut self) -> Option<Bytes> {
394 self.body.take()
395 }
396
397 #[inline]
398 fn as_any(&self) -> &dyn Any {
399 self
400 }
401
402 #[inline]
403 fn as_any_mut(&mut self) -> &mut dyn Any {
404 self
405 }
406}
407
408pub fn parse_topic_filter_type(sys_flag: i32) -> TopicFilterType {
409 if (sys_flag & MessageSysFlag::MULTI_TAGS_FLAG) == MessageSysFlag::MULTI_TAGS_FLAG {
410 TopicFilterType::MultiTag
411 } else {
412 TopicFilterType::SingleTag
413 }
414}
415
416pub fn tags_string2tags_code(tags: Option<&CheetahString>) -> i64 {
417 if tags.is_none() {
418 return 0;
419 }
420 let tags = tags.unwrap();
421 if tags.is_empty() {
422 return 0;
423 }
424 JavaStringHasher::hash_str(tags.as_str()) as i64
425}