rocketmq_common/common/message/
message_single.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt;
20use std::fmt::Display;
21use std::fmt::Formatter;
22use std::hash::DefaultHasher;
23use std::hash::Hash;
24use std::hash::Hasher;
25use std::net::SocketAddr;
26
27use bytes::Buf;
28use bytes::BufMut;
29use bytes::Bytes;
30use cheetah_string::CheetahString;
31
32use crate::common::hasher::string_hasher::JavaStringHasher;
33use crate::common::message::message_ext::MessageExt;
34use crate::common::message::MessageConst;
35use crate::common::message::MessageTrait;
36use crate::common::message::MessageVersion;
37use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
38use crate::common::TopicFilterType;
39use crate::MessageUtils;
40
41#[derive(Clone, Debug)]
42pub struct Message {
43    pub topic: CheetahString,
44    pub flag: i32,
45    pub properties: HashMap<CheetahString, CheetahString>,
46    // original bytes
47    pub body: Option<bytes::Bytes>,
48    // compressed bytes, maybe none, if no need to compress
49    pub compressed_body: Option<bytes::Bytes>,
50    pub transaction_id: Option<CheetahString>,
51}
52
53impl Default for Message {
54    fn default() -> Self {
55        Self {
56            topic: CheetahString::new(),
57            flag: 0,
58            properties: HashMap::new(),
59            body: None,
60            compressed_body: None,
61            transaction_id: None,
62        }
63    }
64}
65
66impl Message {
67    pub fn new(topic: impl Into<CheetahString>, body: &[u8]) -> Self {
68        Self::with_details(
69            topic,
70            CheetahString::new(),
71            CheetahString::new(),
72            0,
73            body,
74            true,
75        )
76    }
77
78    pub fn new_body(topic: impl Into<CheetahString>, body: Option<Bytes>) -> Self {
79        Self::with_details_body(
80            topic,
81            CheetahString::new(),
82            CheetahString::new(),
83            0,
84            body,
85            true,
86        )
87    }
88
89    pub fn with_tags(
90        topic: impl Into<CheetahString>,
91        tags: impl Into<CheetahString>,
92        body: &[u8],
93    ) -> Self {
94        Self::with_details(topic, tags, String::new(), 0, body, true)
95    }
96
97    pub fn with_keys(
98        topic: impl Into<CheetahString>,
99        tags: impl Into<CheetahString>,
100        keys: impl Into<CheetahString>,
101        body: &[u8],
102    ) -> Self {
103        Self::with_details(topic, tags, keys, 0, body, true)
104    }
105
106    pub fn with_details(
107        topic: impl Into<CheetahString>,
108        tags: impl Into<CheetahString>,
109        keys: impl Into<CheetahString>,
110        flag: i32,
111        body: &[u8],
112        wait_store_msg_ok: bool,
113    ) -> Self {
114        let topic = topic.into();
115        let tags = tags.into();
116        let keys = keys.into();
117        let mut message = Message {
118            topic,
119            flag,
120            body: Some(bytes::Bytes::copy_from_slice(body)),
121            ..Default::default()
122        };
123
124        if !tags.is_empty() {
125            message.set_tags(tags);
126        }
127
128        if !keys.is_empty() {
129            message.set_keys(keys);
130        }
131
132        message.set_wait_store_msg_ok(wait_store_msg_ok);
133        message
134    }
135
136    pub fn with_details_body(
137        topic: impl Into<CheetahString>,
138        tags: impl Into<CheetahString>,
139        keys: impl Into<CheetahString>,
140        flag: i32,
141        body: Option<Bytes>,
142        wait_store_msg_ok: bool,
143    ) -> Self {
144        let topic = topic.into();
145        let tags = tags.into();
146        let keys = keys.into();
147        let mut message = Message {
148            topic,
149            flag,
150            body,
151            ..Default::default()
152        };
153
154        if !tags.is_empty() {
155            message.set_tags(tags);
156        }
157
158        if !keys.is_empty() {
159            message.set_keys(keys);
160        }
161
162        message.set_wait_store_msg_ok(wait_store_msg_ok);
163        message
164    }
165
166    #[inline]
167    pub fn set_tags(&mut self, tags: CheetahString) {
168        self.properties.insert(
169            CheetahString::from_static_str(MessageConst::PROPERTY_TAGS),
170            tags,
171        );
172    }
173
174    #[inline]
175    pub fn set_keys(&mut self, keys: CheetahString) {
176        self.properties.insert(
177            CheetahString::from_static_str(MessageConst::PROPERTY_KEYS),
178            keys,
179        );
180    }
181
182    #[inline]
183    pub fn clear_property(&mut self, name: impl Into<CheetahString>) {
184        self.properties.remove(name.into().as_str());
185    }
186
187    #[inline]
188    pub fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
189        self.properties = properties;
190    }
191
192    #[inline]
193    pub fn get_property(&self, key: &CheetahString) -> Option<CheetahString> {
194        self.properties.get(key).cloned()
195    }
196
197    #[inline]
198    pub fn body(&self) -> Option<bytes::Bytes> {
199        self.body.as_ref().cloned()
200    }
201
202    #[inline]
203    pub fn flag(&self) -> i32 {
204        self.flag
205    }
206
207    #[inline]
208    pub fn topic(&self) -> &CheetahString {
209        &self.topic
210    }
211
212    #[inline]
213    pub fn properties(&self) -> &HashMap<CheetahString, CheetahString> {
214        &self.properties
215    }
216
217    #[inline]
218    pub fn transaction_id(&self) -> Option<&str> {
219        self.transaction_id.as_deref()
220    }
221
222    #[inline]
223    pub fn get_tags(&self) -> Option<CheetahString> {
224        self.get_property(&CheetahString::from_static_str(MessageConst::PROPERTY_TAGS))
225    }
226
227    #[inline]
228    pub fn is_wait_store_msg_ok(&self) -> bool {
229        match self.get_property(&CheetahString::from_static_str(
230            MessageConst::PROPERTY_WAIT_STORE_MSG_OK,
231        )) {
232            None => true,
233            Some(value) => value.parse().unwrap_or(true),
234        }
235    }
236
237    #[inline]
238    pub fn get_delay_time_level(&self) -> i32 {
239        match self.properties.get(MessageConst::PROPERTY_DELAY_TIME_LEVEL) {
240            Some(t) => t.parse::<i32>().unwrap_or(0),
241            None => 0,
242        }
243    }
244
245    #[inline]
246    pub fn set_delay_time_level(&mut self, level: i32) {
247        self.properties.insert(
248            CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL),
249            CheetahString::from(level.to_string()),
250        );
251    }
252
253    #[inline]
254    pub fn get_user_property(&self, name: impl Into<CheetahString>) -> Option<CheetahString> {
255        self.properties.get(name.into().as_str()).cloned()
256    }
257
258    #[inline]
259    pub fn as_any(&self) -> &dyn Any {
260        self
261    }
262
263    #[inline]
264    pub fn set_instance_id(&mut self, instance_id: impl Into<CheetahString>) {
265        self.properties.insert(
266            CheetahString::from_static_str(MessageConst::PROPERTY_INSTANCE_ID),
267            instance_id.into(),
268        );
269    }
270}
271
272impl Display for Message {
273    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274        let properties_str = self
275            .properties
276            .iter()
277            .map(|(k, v)| format!("{k}: {v}"))
278            .collect::<Vec<_>>()
279            .join(", ");
280
281        let body_str = match &self.body {
282            Some(body) => format!("Some({body:?})"),
283            None => "None".to_string(),
284        };
285
286        let compressed_body_str = match &self.compressed_body {
287            Some(compressed_body) => format!("Some({compressed_body:?})"),
288            None => "None".to_string(),
289        };
290
291        let transaction_id_str = match &self.transaction_id {
292            Some(transaction_id) => transaction_id.to_string(),
293            None => "None".to_string(),
294        };
295
296        write!(
297            f,
298            "Message {{ topic: {}, flag: {}, properties: {{ {} }}, body: {}, compressed_body: {}, \
299             transaction_id: {} }}",
300            self.topic,
301            self.flag,
302            properties_str,
303            body_str,
304            compressed_body_str,
305            transaction_id_str
306        )
307    }
308}
309
310#[allow(unused_variables)]
311impl MessageTrait for Message {
312    #[inline]
313    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
314        self.properties.insert(key, value);
315    }
316
317    #[inline]
318    fn clear_property(&mut self, name: &str) {
319        self.properties.remove(name);
320    }
321
322    #[inline]
323    fn get_property(&self, name: &CheetahString) -> Option<CheetahString> {
324        self.properties.get(name).cloned()
325    }
326
327    #[inline]
328    fn get_topic(&self) -> &CheetahString {
329        &self.topic
330    }
331
332    #[inline]
333    fn set_topic(&mut self, topic: CheetahString) {
334        self.topic = topic;
335    }
336
337    #[inline]
338    fn get_flag(&self) -> i32 {
339        self.flag
340    }
341
342    #[inline]
343    fn set_flag(&mut self, flag: i32) {
344        self.flag = flag;
345    }
346
347    #[inline]
348    fn get_body(&self) -> Option<&Bytes> {
349        self.body.as_ref()
350    }
351
352    #[inline]
353    fn set_body(&mut self, body: Bytes) {
354        self.body = Some(body);
355    }
356
357    #[inline]
358    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
359        &self.properties
360    }
361
362    #[inline]
363    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
364        self.properties = properties;
365    }
366
367    #[inline]
368    fn get_transaction_id(&self) -> Option<&CheetahString> {
369        self.transaction_id.as_ref()
370    }
371
372    #[inline]
373    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
374        self.transaction_id = Some(transaction_id);
375    }
376
377    #[inline]
378    fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes> {
379        &mut self.compressed_body
380    }
381
382    #[inline]
383    fn get_compressed_body(&self) -> Option<&Bytes> {
384        self.compressed_body.as_ref()
385    }
386
387    #[inline]
388    fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
389        self.compressed_body = Some(compressed_body);
390    }
391
392    #[inline]
393    fn take_body(&mut self) -> Option<Bytes> {
394        self.body.take()
395    }
396
397    #[inline]
398    fn as_any(&self) -> &dyn Any {
399        self
400    }
401
402    #[inline]
403    fn as_any_mut(&mut self) -> &mut dyn Any {
404        self
405    }
406}
407
408pub fn parse_topic_filter_type(sys_flag: i32) -> TopicFilterType {
409    if (sys_flag & MessageSysFlag::MULTI_TAGS_FLAG) == MessageSysFlag::MULTI_TAGS_FLAG {
410        TopicFilterType::MultiTag
411    } else {
412        TopicFilterType::SingleTag
413    }
414}
415
416pub fn tags_string2tags_code(tags: Option<&CheetahString>) -> i64 {
417    if tags.is_none() {
418        return 0;
419    }
420    let tags = tags.unwrap();
421    if tags.is_empty() {
422        return 0;
423    }
424    JavaStringHasher::hash_str(tags.as_str()) as i64
425}