Skip to main content

rocketmq_common/common/message/
message_ext.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::fmt;
18use std::fmt::Display;
19use std::fmt::Formatter;
20use std::net::SocketAddr;
21
22use bytes::Buf;
23use bytes::BufMut;
24use bytes::Bytes;
25use cheetah_string::CheetahString;
26
27use crate::common::message::message_single::Message;
28use crate::common::message::MessageTrait;
29use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
30
31#[derive(Clone, Debug)]
32pub struct MessageExt {
33    pub message: Message,
34    pub broker_name: CheetahString,
35    pub queue_id: i32,
36    pub store_size: i32,
37    pub queue_offset: i64,
38    pub sys_flag: i32,
39    pub born_timestamp: i64,
40    pub born_host: SocketAddr,
41    pub store_timestamp: i64,
42    pub store_host: SocketAddr,
43    pub msg_id: CheetahString,
44    pub commit_log_offset: i64,
45    pub body_crc: u32,
46    pub reconsume_times: i32,
47    pub prepared_transaction_offset: i64,
48}
49
50impl MessageExt {
51    pub fn socket_address_2_byte_buffer(ip: &SocketAddr) -> bytes::Bytes {
52        match ip {
53            SocketAddr::V4(value) => {
54                let mut byte_buffer = bytes::BytesMut::with_capacity(4 + 4);
55                byte_buffer.put_slice(&value.ip().octets());
56                byte_buffer.put_i32(value.port() as i32);
57                byte_buffer.copy_to_bytes(byte_buffer.len())
58            }
59            SocketAddr::V6(value) => {
60                let mut byte_buffer = bytes::BytesMut::with_capacity(16 + 4);
61                byte_buffer.put_slice(&value.ip().octets());
62                byte_buffer.put_i32(value.port() as i32);
63                byte_buffer.copy_to_bytes(byte_buffer.len())
64            }
65        }
66    }
67
68    #[inline]
69    pub fn born_host_bytes(&self) -> bytes::Bytes {
70        Self::socket_address_2_byte_buffer(&self.born_host)
71    }
72
73    #[inline]
74    pub fn born_store_bytes(&self) -> bytes::Bytes {
75        Self::socket_address_2_byte_buffer(&self.store_host)
76    }
77
78    #[inline]
79    pub fn topic(&self) -> &CheetahString {
80        self.message.topic()
81    }
82
83    #[inline]
84    pub fn born_host(&self) -> SocketAddr {
85        self.born_host
86    }
87
88    #[inline]
89    pub fn store_host(&self) -> SocketAddr {
90        self.store_host
91    }
92
93    #[inline]
94    pub fn with_born_host_v6_flag(&mut self) {
95        self.sys_flag |= MessageSysFlag::BORNHOST_V6_FLAG;
96    }
97
98    #[inline]
99    pub fn with_store_host_v6_flag(&mut self) {
100        self.sys_flag |= MessageSysFlag::STOREHOSTADDRESS_V6_FLAG;
101    }
102
103    #[inline]
104    pub fn body(&self) -> Option<bytes::Bytes> {
105        self.message.body()
106    }
107
108    #[inline]
109    pub fn sys_flag(&self) -> i32 {
110        self.sys_flag
111    }
112
113    #[inline]
114    pub fn body_crc(&self) -> u32 {
115        self.body_crc
116    }
117
118    #[inline]
119    pub fn queue_id(&self) -> i32 {
120        self.queue_id
121    }
122
123    #[inline]
124    pub fn flag(&self) -> i32 {
125        self.message.flag()
126    }
127
128    #[inline]
129    pub fn message_inner(&self) -> &Message {
130        &self.message
131    }
132
133    #[inline]
134    pub fn broker_name(&self) -> &str {
135        &self.broker_name
136    }
137
138    #[inline]
139    pub fn store_size(&self) -> i32 {
140        self.store_size
141    }
142
143    #[inline]
144    pub fn queue_offset(&self) -> i64 {
145        self.queue_offset
146    }
147
148    #[inline]
149    pub fn born_timestamp(&self) -> i64 {
150        self.born_timestamp
151    }
152
153    #[inline]
154    pub fn store_timestamp(&self) -> i64 {
155        self.store_timestamp
156    }
157
158    #[inline]
159    pub fn msg_id(&self) -> &CheetahString {
160        &self.msg_id
161    }
162
163    #[inline]
164    pub fn commit_log_offset(&self) -> i64 {
165        self.commit_log_offset
166    }
167
168    #[inline]
169    pub fn reconsume_times(&self) -> i32 {
170        self.reconsume_times
171    }
172
173    #[inline]
174    pub fn prepared_transaction_offset(&self) -> i64 {
175        self.prepared_transaction_offset
176    }
177    #[inline]
178    pub fn set_message_inner(&mut self, message_inner: Message) {
179        self.message = message_inner;
180    }
181    #[inline]
182    pub fn set_broker_name(&mut self, broker_name: CheetahString) {
183        self.broker_name = broker_name;
184    }
185    #[inline]
186    pub fn set_queue_id(&mut self, queue_id: i32) {
187        self.queue_id = queue_id;
188    }
189    #[inline]
190    pub fn set_store_size(&mut self, store_size: i32) {
191        self.store_size = store_size;
192    }
193    #[inline]
194    pub fn set_queue_offset(&mut self, queue_offset: i64) {
195        self.queue_offset = queue_offset;
196    }
197    #[inline]
198    pub fn set_sys_flag(&mut self, sys_flag: i32) {
199        self.sys_flag = sys_flag;
200    }
201    #[inline]
202    pub fn set_born_timestamp(&mut self, born_timestamp: i64) {
203        self.born_timestamp = born_timestamp;
204    }
205    #[inline]
206    pub fn set_born_host(&mut self, born_host: SocketAddr) {
207        self.born_host = born_host;
208    }
209    #[inline]
210    pub fn set_store_timestamp(&mut self, store_timestamp: i64) {
211        self.store_timestamp = store_timestamp;
212    }
213    #[inline]
214    pub fn set_store_host(&mut self, store_host: SocketAddr) {
215        self.store_host = store_host;
216    }
217    #[inline]
218    pub fn set_msg_id(&mut self, msg_id: CheetahString) {
219        self.msg_id = msg_id;
220    }
221    #[inline]
222    pub fn set_commit_log_offset(&mut self, commit_log_offset: i64) {
223        self.commit_log_offset = commit_log_offset;
224    }
225    #[inline]
226    pub fn set_body_crc(&mut self, body_crc: u32) {
227        self.body_crc = body_crc;
228    }
229    #[inline]
230    pub fn set_reconsume_times(&mut self, reconsume_times: i32) {
231        self.reconsume_times = reconsume_times;
232    }
233    #[inline]
234    pub fn set_prepared_transaction_offset(&mut self, prepared_transaction_offset: i64) {
235        self.prepared_transaction_offset = prepared_transaction_offset;
236    }
237    #[inline]
238    pub fn properties(&self) -> &HashMap<CheetahString, CheetahString> {
239        self.message.properties().as_map()
240    }
241    #[inline]
242    pub fn get_tags(&self) -> Option<CheetahString> {
243        self.message.get_tags()
244    }
245}
246
247impl Default for MessageExt {
248    fn default() -> Self {
249        Self {
250            message: Message::default(),
251            broker_name: CheetahString::default(),
252            queue_id: 0,
253            store_size: 0,
254            queue_offset: 0,
255            sys_flag: 0,
256            born_timestamp: 0,
257            born_host: "127.0.0.1:10911".parse().unwrap(),
258            store_timestamp: 0,
259            store_host: "127.0.0.1:10911".parse().unwrap(),
260            msg_id: CheetahString::default(),
261            commit_log_offset: 0,
262            body_crc: 0,
263            reconsume_times: 0,
264            prepared_transaction_offset: 0,
265        }
266    }
267}
268
269impl fmt::Display for MessageExt {
270    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271        write!(
272            f,
273            "MessageExt {{ message: {}, broker_name: {}, queue_id: {}, store_size: {}, queue_offset: {}, sys_flag: \
274             {}, born_timestamp: {}, born_host: {}, store_timestamp: {}, store_host: {}, msg_id: {}, \
275             commit_log_offset: {}, body_crc: {}, reconsume_times: {}, prepared_transaction_offset: {} }}",
276            self.message,
277            self.broker_name,
278            self.queue_id,
279            self.store_size,
280            self.queue_offset,
281            self.sys_flag,
282            self.born_timestamp,
283            self.born_host,
284            self.store_timestamp,
285            self.store_host,
286            self.msg_id,
287            self.commit_log_offset,
288            self.body_crc,
289            self.reconsume_times,
290            self.prepared_transaction_offset
291        )
292    }
293}
294impl MessageTrait for MessageExt {
295    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
296        self.message.put_property(key, value);
297    }
298
299    fn clear_property(&mut self, name: &str) {
300        self.message.clear_property(name);
301    }
302
303    fn property(&self, name: &CheetahString) -> Option<CheetahString> {
304        self.message.get_property(name)
305    }
306
307    fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
308        self.message.property_ref(name)
309    }
310
311    fn topic(&self) -> &CheetahString {
312        self.message.topic()
313    }
314
315    fn set_topic(&mut self, topic: CheetahString) {
316        self.message.set_topic(topic);
317    }
318
319    fn get_flag(&self) -> i32 {
320        self.message.get_flag()
321    }
322
323    fn set_flag(&mut self, flag: i32) {
324        self.message.set_flag(flag);
325    }
326
327    fn get_body(&self) -> Option<&Bytes> {
328        self.message.get_body()
329    }
330
331    fn set_body(&mut self, body: Bytes) {
332        self.message.set_body(Some(body));
333    }
334
335    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
336        self.message.get_properties()
337    }
338
339    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
340        self.message.set_properties(properties);
341    }
342
343    #[inline]
344    fn transaction_id(&self) -> Option<&CheetahString> {
345        MessageTrait::transaction_id(&self.message)
346    }
347
348    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
349        self.message.set_transaction_id(transaction_id);
350    }
351
352    fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes> {
353        self.message.get_compressed_body_mut()
354    }
355
356    fn get_compressed_body(&self) -> Option<&Bytes> {
357        self.message.get_compressed_body()
358    }
359
360    fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
361        self.message.set_compressed_body_mut(compressed_body);
362    }
363    #[inline]
364    fn take_body(&mut self) -> Option<Bytes> {
365        self.message.take_body()
366    }
367
368    fn as_any(&self) -> &dyn Any {
369        self
370    }
371
372    fn as_any_mut(&mut self) -> &mut dyn Any {
373        self
374    }
375}
376
377// Automatic deref to Message for convenient access
378impl std::ops::Deref for MessageExt {
379    type Target = Message;
380
381    fn deref(&self) -> &Self::Target {
382        &self.message
383    }
384}
385
386impl std::ops::DerefMut for MessageExt {
387    fn deref_mut(&mut self) -> &mut Self::Target {
388        &mut self.message
389    }
390}
391
392// Conversion to new MessageEnvelope type
393impl From<MessageExt> for crate::common::message::message_envelope::MessageEnvelope {
394    fn from(ext: MessageExt) -> Self {
395        use crate::common::message::message_envelope::MessageEnvelope;
396        use crate::common::message::routing_context::RoutingContext;
397        use crate::common::message::storage_metadata::StorageMetadata;
398
399        let routing = RoutingContext::new(ext.born_host, ext.born_timestamp, ext.sys_flag);
400
401        let storage = StorageMetadata::new(
402            ext.broker_name,
403            ext.queue_id,
404            ext.queue_offset,
405            ext.commit_log_offset,
406            ext.store_timestamp,
407            ext.store_host,
408            ext.store_size,
409        );
410
411        MessageEnvelope::new(
412            ext.message,
413            routing,
414            storage,
415            ext.msg_id,
416            ext.body_crc,
417            ext.reconsume_times,
418            ext.prepared_transaction_offset,
419        )
420    }
421}
422
423// Conversion from new MessageEnvelope type
424impl From<crate::common::message::message_envelope::MessageEnvelope> for MessageExt {
425    fn from(envelope: crate::common::message::message_envelope::MessageEnvelope) -> Self {
426        Self {
427            message: envelope.message().clone(),
428            broker_name: CheetahString::from_string(envelope.broker_name().to_string()),
429            queue_id: envelope.queue_id(),
430            store_size: envelope.store_size(),
431            queue_offset: envelope.queue_offset(),
432            sys_flag: envelope.sys_flag(),
433            born_timestamp: envelope.born_timestamp(),
434            born_host: envelope.born_host(),
435            store_timestamp: envelope.store_timestamp(),
436            store_host: envelope.store_host(),
437            msg_id: envelope.msg_id().clone(),
438            commit_log_offset: envelope.commit_log_offset(),
439            body_crc: envelope.body_crc(),
440            reconsume_times: envelope.reconsume_times(),
441            prepared_transaction_offset: envelope.prepared_transaction_offset(),
442        }
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use cheetah_string::CheetahString;
449
450    use super::*;
451    use crate::common::message::MessageTrait;
452
453    #[test]
454    fn message_ext_message_trait_transaction_id_returns_cheetah_string_ref() {
455        let mut message_ext = MessageExt::default();
456        message_ext.set_transaction_id(CheetahString::from_static_str("tx-123"));
457
458        let transaction_id = <MessageExt as MessageTrait>::transaction_id(&message_ext);
459
460        assert_eq!(transaction_id, Some(&CheetahString::from_static_str("tx-123")));
461    }
462}