Skip to main content

rocketmq_common/common/message/
message_ext_broker_inner.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::fmt;
18use std::fmt::Debug;
19use std::fmt::Display;
20use std::fmt::Formatter;
21use std::net::SocketAddr;
22
23use bytes::Bytes;
24use cheetah_string::CheetahString;
25use rocketmq_rust::ArcMut;
26
27use crate::common::hasher::string_hasher::JavaStringHasher;
28use crate::common::message::message_ext::MessageExt;
29use crate::common::message::message_single::Message;
30use crate::common::message::MessageTrait;
31use crate::common::message::MessageVersion;
32use crate::common::TopicFilterType;
33use crate::MessageUtils;
34
35#[derive(Default)]
36pub struct MessageExtBrokerInner {
37    pub message_ext_inner: MessageExt,
38    pub properties_string: CheetahString,
39    pub tags_code: i64,
40    pub encoded_buff: Option<bytes::BytesMut>,
41    pub encode_completed: bool,
42    pub version: MessageVersion,
43}
44
45impl MessageExtBrokerInner {
46    const VERSION: MessageVersion = MessageVersion::V1;
47
48    #[inline]
49    pub fn delete_property(&mut self, name: impl Into<CheetahString>) {
50        let name = name.into();
51        self.message_ext_inner.message.clear_property(name.as_str());
52        self.properties_string = CheetahString::from_string(MessageUtils::delete_property(
53            self.properties_string.as_str(),
54            name.as_str(),
55        ));
56    }
57
58    #[inline]
59    pub fn with_version(&mut self, version: MessageVersion) {
60        self.version = version;
61    }
62
63    #[inline]
64    pub fn version(&self) -> MessageVersion {
65        self.version
66    }
67
68    #[inline]
69    pub fn topic(&self) -> &CheetahString {
70        self.message_ext_inner.topic()
71    }
72
73    #[inline]
74    pub fn get_topic(&self) -> &CheetahString {
75        self.message_ext_inner.topic()
76    }
77
78    #[inline]
79    pub fn born_host(&self) -> SocketAddr {
80        self.message_ext_inner.born_host()
81    }
82
83    #[inline]
84    pub fn store_host(&self) -> SocketAddr {
85        self.message_ext_inner.store_host()
86    }
87
88    #[inline]
89    pub fn with_born_host_v6_flag(&mut self) {
90        self.message_ext_inner.with_born_host_v6_flag()
91    }
92
93    #[inline]
94    pub fn with_store_host_v6_flag(&mut self) {
95        self.message_ext_inner.with_store_host_v6_flag()
96    }
97
98    #[inline]
99    pub fn body(&self) -> Option<bytes::Bytes> {
100        self.message_ext_inner.body()
101    }
102
103    #[inline]
104    pub fn sys_flag(&self) -> i32 {
105        self.message_ext_inner.sys_flag()
106    }
107
108    #[inline]
109    pub fn body_crc(&self) -> u32 {
110        self.message_ext_inner.body_crc()
111    }
112
113    #[inline]
114    pub fn queue_id(&self) -> i32 {
115        self.message_ext_inner.queue_id()
116    }
117
118    #[inline]
119    pub fn flag(&self) -> i32 {
120        self.message_ext_inner.flag()
121    }
122
123    #[inline]
124    pub fn born_timestamp(&self) -> i64 {
125        self.message_ext_inner.born_timestamp()
126    }
127
128    #[inline]
129    pub fn store_timestamp(&self) -> i64 {
130        self.message_ext_inner.store_timestamp()
131    }
132
133    #[inline]
134    pub fn born_host_bytes(&self) -> bytes::Bytes {
135        self.message_ext_inner.born_host_bytes()
136    }
137
138    #[inline]
139    pub fn store_host_bytes(&self) -> bytes::Bytes {
140        self.message_ext_inner.born_store_bytes()
141    }
142
143    #[inline]
144    pub fn reconsume_times(&self) -> i32 {
145        self.message_ext_inner.reconsume_times()
146    }
147
148    #[inline]
149    pub fn prepared_transaction_offset(&self) -> i64 {
150        self.message_ext_inner.prepared_transaction_offset()
151    }
152
153    #[inline]
154    pub fn property(&self, name: &str) -> Option<CheetahString> {
155        self.message_ext_inner.properties().get(name).cloned()
156    }
157
158    #[inline]
159    pub fn properties_string(&self) -> &str {
160        self.properties_string.as_str()
161    }
162
163    #[inline]
164    pub fn queue_offset(&self) -> i64 {
165        self.message_ext_inner.queue_offset()
166    }
167
168    #[inline]
169    pub fn tags_string2tags_code(_filter: &TopicFilterType, tags: &str) -> i64 {
170        if tags.is_empty() {
171            return 0;
172        }
173        JavaStringHasher::hash_str(tags) as i64
174    }
175
176    #[inline]
177    pub fn tags_string_to_tags_code(tags: &str) -> i64 {
178        if tags.is_empty() {
179            return 0;
180        }
181        JavaStringHasher::hash_str(tags) as i64
182    }
183
184    #[inline]
185    pub fn get_tags(&self) -> Option<CheetahString> {
186        self.message_ext_inner.get_tags()
187    }
188
189    #[inline]
190    pub fn get_transaction_id(&self) -> Option<&CheetahString> {
191        self.message_ext_inner.get_transaction_id()
192    }
193
194    #[inline]
195    pub fn is_wait_store_msg_ok(&self) -> bool {
196        self.message_ext_inner.message.is_wait_store_msg_ok()
197    }
198
199    #[inline]
200    pub fn body_len(&self) -> usize {
201        self.message_ext_inner.message.get_body().unwrap().len()
202    }
203}
204
205impl fmt::Display for MessageExtBrokerInner {
206    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207        let encoded_buff_str = match &self.encoded_buff {
208            Some(encoded_buff) =>
209            /* format!("Some({:?})", encoded_buff) */
210            {
211                "****".to_string()
212            }
213            None => "None".to_string(),
214        };
215
216        write!(
217            f,
218            "MessageExtBrokerInner {{ message_ext_inner: {}, properties_string: {}, tags_code: {}, encoded_buff: {}, \
219             encode_completed: {}, version: {} }}",
220            self.message_ext_inner,
221            self.properties_string,
222            self.tags_code,
223            encoded_buff_str,
224            self.encode_completed,
225            self.version
226        )
227    }
228}
229
230impl Debug for MessageExtBrokerInner {
231    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
232        let encoded_buff_str = match &self.encoded_buff {
233            Some(encoded_buff) =>
234            /* format!("Some({:?})", encoded_buff) */
235            {
236                "****".to_string()
237            }
238            None => "None".to_string(),
239        };
240
241        write!(
242            f,
243            "MessageExtBrokerInner {{ message_ext_inner: {:?}, properties_string: {}, tags_code: {}, encoded_buff: \
244             {}, encode_completed: {}, version: {} }}",
245            self.message_ext_inner,
246            self.properties_string,
247            self.tags_code,
248            encoded_buff_str,
249            self.encode_completed,
250            self.version
251        )
252    }
253}
254
255impl MessageTrait for MessageExtBrokerInner {
256    #[inline]
257    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
258        self.message_ext_inner.put_property(key, value);
259    }
260
261    #[inline]
262    fn clear_property(&mut self, name: &str) {
263        self.message_ext_inner.clear_property(name);
264    }
265
266    #[inline]
267    fn property(&self, name: &CheetahString) -> Option<CheetahString> {
268        self.message_ext_inner.property(name)
269    }
270
271    fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
272        self.message_ext_inner.property_ref(name)
273    }
274
275    #[inline]
276    fn topic(&self) -> &CheetahString {
277        self.message_ext_inner.topic()
278    }
279
280    #[inline]
281    fn set_topic(&mut self, topic: CheetahString) {
282        self.message_ext_inner.set_topic(topic);
283    }
284
285    #[inline]
286    fn get_flag(&self) -> i32 {
287        self.message_ext_inner.get_flag()
288    }
289
290    #[inline]
291    fn set_flag(&mut self, flag: i32) {
292        self.message_ext_inner.set_flag(flag);
293    }
294
295    #[inline]
296    fn get_body(&self) -> Option<&Bytes> {
297        self.message_ext_inner.get_body()
298    }
299
300    #[inline]
301    fn set_body(&mut self, body: Bytes) {
302        self.message_ext_inner.set_body(body);
303    }
304
305    #[inline]
306    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
307        self.message_ext_inner.get_properties()
308    }
309
310    #[inline]
311    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
312        self.message_ext_inner.set_properties(properties);
313    }
314
315    #[inline]
316    fn transaction_id(&self) -> Option<&CheetahString> {
317        self.message_ext_inner.transaction_id()
318    }
319
320    #[inline]
321    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
322        self.message_ext_inner.set_transaction_id(transaction_id);
323    }
324
325    #[inline]
326    fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes> {
327        self.message_ext_inner.get_compressed_body_mut()
328    }
329
330    #[inline]
331    fn get_compressed_body(&self) -> Option<&Bytes> {
332        self.message_ext_inner.get_compressed_body()
333    }
334
335    #[inline]
336    fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
337        self.message_ext_inner.set_compressed_body_mut(compressed_body);
338    }
339
340    #[inline]
341    fn take_body(&mut self) -> Option<Bytes> {
342        self.message_ext_inner.take_body()
343    }
344
345    #[inline]
346    fn as_any(&self) -> &dyn Any {
347        self
348    }
349
350    #[inline]
351    fn as_any_mut(&mut self) -> &mut dyn Any {
352        self
353    }
354}
355
356// Conversion to new BrokerMessage type
357impl From<MessageExtBrokerInner> for crate::common::message::broker_message::BrokerMessage {
358    fn from(inner: MessageExtBrokerInner) -> Self {
359        use crate::common::message::broker_message::BrokerMessage;
360        use crate::common::message::message_envelope::MessageEnvelope;
361
362        let envelope = MessageEnvelope::from(inner.message_ext_inner);
363
364        BrokerMessage::new(envelope, inner.tags_code, inner.properties_string, inner.version)
365    }
366}
367
368// Conversion from new BrokerMessage type
369impl From<crate::common::message::broker_message::BrokerMessage> for MessageExtBrokerInner {
370    fn from(broker_msg: crate::common::message::broker_message::BrokerMessage) -> Self {
371        use crate::common::message::message_ext::MessageExt;
372
373        let message_ext_inner = MessageExt::from(broker_msg.envelope().clone());
374
375        Self {
376            message_ext_inner,
377            properties_string: CheetahString::from_string(broker_msg.properties_string().to_string()),
378            tags_code: broker_msg.tags_code(),
379            encoded_buff: broker_msg.encoded_buff().cloned(),
380            encode_completed: broker_msg.is_encode_completed(),
381            version: broker_msg.version(),
382        }
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::MessageExtBrokerInner;
389    use crate::common::message::MessageTrait;
390    use cheetah_string::CheetahString;
391
392    #[test]
393    fn message_ext_broker_inner_get_transaction_id_delegates_to_inner_message() {
394        let mut message = MessageExtBrokerInner::default();
395        MessageTrait::set_transaction_id(&mut message, CheetahString::from("tx-123"));
396
397        assert_eq!(message.get_transaction_id(), Some(&CheetahString::from("tx-123")));
398    }
399}