rocketmq_common/common/message/
message_ext_broker_inner.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt;
20use std::fmt::Debug;
21use std::fmt::Display;
22use std::fmt::Formatter;
23use std::net::SocketAddr;
24
25use bytes::Bytes;
26use cheetah_string::CheetahString;
27use rocketmq_rust::ArcMut;
28
29use crate::common::hasher::string_hasher::JavaStringHasher;
30use crate::common::message::message_ext::MessageExt;
31use crate::common::message::message_single::Message;
32use crate::common::message::MessageTrait;
33use crate::common::message::MessageVersion;
34use crate::common::TopicFilterType;
35use crate::MessageUtils;
36
37#[derive(Default)]
38pub struct MessageExtBrokerInner {
39    pub message_ext_inner: MessageExt,
40    pub properties_string: CheetahString,
41    pub tags_code: i64,
42    pub encoded_buff: Option<bytes::BytesMut>,
43    pub encode_completed: bool,
44    pub version: MessageVersion,
45}
46
47impl MessageExtBrokerInner {
48    const VERSION: MessageVersion = MessageVersion::V1;
49
50    #[inline]
51    pub fn delete_property(&mut self, name: impl Into<CheetahString>) {
52        let name = name.into();
53        self.message_ext_inner.message.clear_property(name.as_str());
54        self.properties_string = CheetahString::from_string(MessageUtils::delete_property(
55            self.properties_string.as_str(),
56            name.as_str(),
57        ));
58    }
59
60    #[inline]
61    pub fn with_version(&mut self, version: MessageVersion) {
62        self.version = version;
63    }
64
65    #[inline]
66    pub fn version(&self) -> MessageVersion {
67        self.version
68    }
69
70    #[inline]
71    pub fn topic(&self) -> &CheetahString {
72        self.message_ext_inner.topic()
73    }
74
75    #[inline]
76    pub fn get_topic(&self) -> &CheetahString {
77        self.message_ext_inner.get_topic()
78    }
79
80    #[inline]
81    pub fn born_host(&self) -> SocketAddr {
82        self.message_ext_inner.born_host()
83    }
84
85    #[inline]
86    pub fn store_host(&self) -> SocketAddr {
87        self.message_ext_inner.store_host()
88    }
89
90    #[inline]
91    pub fn with_born_host_v6_flag(&mut self) {
92        self.message_ext_inner.with_born_host_v6_flag()
93    }
94
95    #[inline]
96    pub fn with_store_host_v6_flag(&mut self) {
97        self.message_ext_inner.with_store_host_v6_flag()
98    }
99
100    #[inline]
101    pub fn body(&self) -> Option<bytes::Bytes> {
102        self.message_ext_inner.body()
103    }
104
105    #[inline]
106    pub fn sys_flag(&self) -> i32 {
107        self.message_ext_inner.sys_flag()
108    }
109
110    #[inline]
111    pub fn body_crc(&self) -> u32 {
112        self.message_ext_inner.body_crc()
113    }
114
115    #[inline]
116    pub fn queue_id(&self) -> i32 {
117        self.message_ext_inner.queue_id()
118    }
119
120    #[inline]
121    pub fn flag(&self) -> i32 {
122        self.message_ext_inner.flag()
123    }
124
125    #[inline]
126    pub fn born_timestamp(&self) -> i64 {
127        self.message_ext_inner.born_timestamp()
128    }
129
130    #[inline]
131    pub fn store_timestamp(&self) -> i64 {
132        self.message_ext_inner.store_timestamp()
133    }
134
135    #[inline]
136    pub fn born_host_bytes(&self) -> bytes::Bytes {
137        self.message_ext_inner.born_host_bytes()
138    }
139
140    #[inline]
141    pub fn store_host_bytes(&self) -> bytes::Bytes {
142        self.message_ext_inner.born_store_bytes()
143    }
144
145    #[inline]
146    pub fn reconsume_times(&self) -> i32 {
147        self.message_ext_inner.reconsume_times()
148    }
149
150    #[inline]
151    pub fn prepared_transaction_offset(&self) -> i64 {
152        self.message_ext_inner.prepared_transaction_offset()
153    }
154
155    #[inline]
156    pub fn property(&self, name: &str) -> Option<CheetahString> {
157        self.message_ext_inner.properties().get(name).cloned()
158    }
159
160    #[inline]
161    pub fn properties_string(&self) -> &str {
162        self.properties_string.as_str()
163    }
164
165    #[inline]
166    pub fn queue_offset(&self) -> i64 {
167        self.message_ext_inner.queue_offset()
168    }
169
170    #[inline]
171    pub fn tags_string2tags_code(_filter: &TopicFilterType, tags: &str) -> i64 {
172        if tags.is_empty() {
173            return 0;
174        }
175        JavaStringHasher::hash_str(tags) as i64
176    }
177
178    #[inline]
179    pub fn tags_string_to_tags_code(tags: &str) -> i64 {
180        if tags.is_empty() {
181            return 0;
182        }
183        JavaStringHasher::hash_str(tags) as i64
184    }
185
186    #[inline]
187    pub fn get_tags(&self) -> Option<CheetahString> {
188        self.message_ext_inner.get_tags()
189    }
190
191    #[inline]
192    pub fn is_wait_store_msg_ok(&self) -> bool {
193        self.message_ext_inner.message.is_wait_store_msg_ok()
194    }
195
196    #[inline]
197    pub fn body_len(&self) -> usize {
198        self.message_ext_inner.message.body.as_ref().unwrap().len()
199    }
200}
201
202impl fmt::Display for MessageExtBrokerInner {
203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204        let encoded_buff_str = match &self.encoded_buff {
205            Some(encoded_buff) =>
206            /* format!("Some({:?})", encoded_buff) */
207            {
208                "****".to_string()
209            }
210            None => "None".to_string(),
211        };
212
213        write!(
214            f,
215            "MessageExtBrokerInner {{ message_ext_inner: {}, properties_string: {}, tags_code: \
216             {}, encoded_buff: {}, encode_completed: {}, version: {} }}",
217            self.message_ext_inner,
218            self.properties_string,
219            self.tags_code,
220            encoded_buff_str,
221            self.encode_completed,
222            self.version
223        )
224    }
225}
226
227impl Debug for MessageExtBrokerInner {
228    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
229        let encoded_buff_str = match &self.encoded_buff {
230            Some(encoded_buff) =>
231            /* format!("Some({:?})", encoded_buff) */
232            {
233                "****".to_string()
234            }
235            None => "None".to_string(),
236        };
237
238        write!(
239            f,
240            "MessageExtBrokerInner {{ message_ext_inner: {:?}, properties_string: {}, tags_code: \
241             {}, encoded_buff: {}, encode_completed: {}, version: {} }}",
242            self.message_ext_inner,
243            self.properties_string,
244            self.tags_code,
245            encoded_buff_str,
246            self.encode_completed,
247            self.version
248        )
249    }
250}
251
252impl MessageTrait for MessageExtBrokerInner {
253    #[inline]
254    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
255        self.message_ext_inner.put_property(key, value);
256    }
257
258    #[inline]
259    fn clear_property(&mut self, name: &str) {
260        self.message_ext_inner.clear_property(name);
261    }
262
263    #[inline]
264    fn get_property(&self, name: &CheetahString) -> Option<CheetahString> {
265        self.message_ext_inner.get_property(name)
266    }
267
268    #[inline]
269    fn get_topic(&self) -> &CheetahString {
270        self.message_ext_inner.get_topic()
271    }
272
273    #[inline]
274    fn set_topic(&mut self, topic: CheetahString) {
275        self.message_ext_inner.set_topic(topic);
276    }
277
278    #[inline]
279    fn get_flag(&self) -> i32 {
280        self.message_ext_inner.get_flag()
281    }
282
283    #[inline]
284    fn set_flag(&mut self, flag: i32) {
285        self.message_ext_inner.set_flag(flag);
286    }
287
288    #[inline]
289    fn get_body(&self) -> Option<&Bytes> {
290        self.message_ext_inner.get_body()
291    }
292
293    #[inline]
294    fn set_body(&mut self, body: Bytes) {
295        self.message_ext_inner.set_body(body);
296    }
297
298    #[inline]
299    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
300        self.message_ext_inner.get_properties()
301    }
302
303    #[inline]
304    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
305        self.message_ext_inner.set_properties(properties);
306    }
307
308    #[inline]
309    fn get_transaction_id(&self) -> Option<&CheetahString> {
310        self.message_ext_inner.get_transaction_id()
311    }
312
313    #[inline]
314    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
315        self.message_ext_inner.set_transaction_id(transaction_id);
316    }
317
318    #[inline]
319    fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes> {
320        self.message_ext_inner.get_compressed_body_mut()
321    }
322
323    #[inline]
324    fn get_compressed_body(&self) -> Option<&Bytes> {
325        self.message_ext_inner.get_compressed_body()
326    }
327
328    #[inline]
329    fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
330        self.message_ext_inner
331            .set_compressed_body_mut(compressed_body);
332    }
333
334    #[inline]
335    fn take_body(&mut self) -> Option<Bytes> {
336        self.message_ext_inner.take_body()
337    }
338
339    #[inline]
340    fn as_any(&self) -> &dyn Any {
341        self
342    }
343
344    #[inline]
345    fn as_any_mut(&mut self) -> &mut dyn Any {
346        self
347    }
348}