rocketmq_common/common/message/
message_ext.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt;
20use std::fmt::Display;
21use std::fmt::Formatter;
22use std::net::SocketAddr;
23
24use bytes::Buf;
25use bytes::BufMut;
26use bytes::Bytes;
27use cheetah_string::CheetahString;
28
29use crate::common::message::message_single::Message;
30use crate::common::message::MessageTrait;
31use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
32
33#[derive(Clone, Debug)]
34pub struct MessageExt {
35    pub message: Message,
36    pub broker_name: CheetahString,
37    pub queue_id: i32,
38    pub store_size: i32,
39    pub queue_offset: i64,
40    pub sys_flag: i32,
41    pub born_timestamp: i64,
42    pub born_host: SocketAddr,
43    pub store_timestamp: i64,
44    pub store_host: SocketAddr,
45    pub msg_id: CheetahString,
46    pub commit_log_offset: i64,
47    pub body_crc: u32,
48    pub reconsume_times: i32,
49    pub prepared_transaction_offset: i64,
50}
51
52impl MessageExt {
53    pub fn socket_address_2_byte_buffer(ip: &SocketAddr) -> bytes::Bytes {
54        match ip {
55            SocketAddr::V4(value) => {
56                let mut byte_buffer = bytes::BytesMut::with_capacity(4 + 4);
57                byte_buffer.put_slice(&value.ip().octets());
58                byte_buffer.put_i32(value.port() as i32);
59                byte_buffer.copy_to_bytes(byte_buffer.len())
60            }
61            SocketAddr::V6(value) => {
62                let mut byte_buffer = bytes::BytesMut::with_capacity(16 + 4);
63                byte_buffer.put_slice(&value.ip().octets());
64                byte_buffer.put_i32(value.port() as i32);
65                byte_buffer.copy_to_bytes(byte_buffer.len())
66            }
67        }
68    }
69
70    #[inline]
71    pub fn born_host_bytes(&self) -> bytes::Bytes {
72        Self::socket_address_2_byte_buffer(&self.born_host)
73    }
74
75    #[inline]
76    pub fn born_store_bytes(&self) -> bytes::Bytes {
77        Self::socket_address_2_byte_buffer(&self.store_host)
78    }
79
80    #[inline]
81    pub fn topic(&self) -> &CheetahString {
82        self.message.topic()
83    }
84
85    #[inline]
86    pub fn born_host(&self) -> SocketAddr {
87        self.born_host
88    }
89
90    #[inline]
91    pub fn store_host(&self) -> SocketAddr {
92        self.store_host
93    }
94
95    #[inline]
96    pub fn with_born_host_v6_flag(&mut self) {
97        self.sys_flag |= MessageSysFlag::BORNHOST_V6_FLAG;
98    }
99
100    #[inline]
101    pub fn with_store_host_v6_flag(&mut self) {
102        self.sys_flag |= MessageSysFlag::STOREHOSTADDRESS_V6_FLAG;
103    }
104
105    #[inline]
106    pub fn body(&self) -> Option<bytes::Bytes> {
107        self.message.body()
108    }
109
110    #[inline]
111    pub fn sys_flag(&self) -> i32 {
112        self.sys_flag
113    }
114
115    #[inline]
116    pub fn body_crc(&self) -> u32 {
117        self.body_crc
118    }
119
120    #[inline]
121    pub fn queue_id(&self) -> i32 {
122        self.queue_id
123    }
124
125    #[inline]
126    pub fn flag(&self) -> i32 {
127        self.message.flag()
128    }
129
130    #[inline]
131    pub fn message_inner(&self) -> &Message {
132        &self.message
133    }
134
135    #[inline]
136    pub fn broker_name(&self) -> &str {
137        &self.broker_name
138    }
139
140    #[inline]
141    pub fn store_size(&self) -> i32 {
142        self.store_size
143    }
144
145    #[inline]
146    pub fn queue_offset(&self) -> i64 {
147        self.queue_offset
148    }
149
150    #[inline]
151    pub fn born_timestamp(&self) -> i64 {
152        self.born_timestamp
153    }
154
155    #[inline]
156    pub fn store_timestamp(&self) -> i64 {
157        self.store_timestamp
158    }
159
160    #[inline]
161    pub fn msg_id(&self) -> &CheetahString {
162        &self.msg_id
163    }
164
165    #[inline]
166    pub fn commit_log_offset(&self) -> i64 {
167        self.commit_log_offset
168    }
169
170    #[inline]
171    pub fn reconsume_times(&self) -> i32 {
172        self.reconsume_times
173    }
174
175    #[inline]
176    pub fn prepared_transaction_offset(&self) -> i64 {
177        self.prepared_transaction_offset
178    }
179
180    pub fn set_message_inner(&mut self, message_inner: Message) {
181        self.message = message_inner;
182    }
183
184    pub fn set_broker_name(&mut self, broker_name: CheetahString) {
185        self.broker_name = broker_name;
186    }
187
188    pub fn set_queue_id(&mut self, queue_id: i32) {
189        self.queue_id = queue_id;
190    }
191
192    pub fn set_store_size(&mut self, store_size: i32) {
193        self.store_size = store_size;
194    }
195
196    pub fn set_queue_offset(&mut self, queue_offset: i64) {
197        self.queue_offset = queue_offset;
198    }
199
200    pub fn set_sys_flag(&mut self, sys_flag: i32) {
201        self.sys_flag = sys_flag;
202    }
203
204    pub fn set_born_timestamp(&mut self, born_timestamp: i64) {
205        self.born_timestamp = born_timestamp;
206    }
207
208    pub fn set_born_host(&mut self, born_host: SocketAddr) {
209        self.born_host = born_host;
210    }
211
212    pub fn set_store_timestamp(&mut self, store_timestamp: i64) {
213        self.store_timestamp = store_timestamp;
214    }
215
216    pub fn set_store_host(&mut self, store_host: SocketAddr) {
217        self.store_host = store_host;
218    }
219
220    pub fn set_msg_id(&mut self, msg_id: CheetahString) {
221        self.msg_id = msg_id;
222    }
223
224    pub fn set_commit_log_offset(&mut self, commit_log_offset: i64) {
225        self.commit_log_offset = commit_log_offset;
226    }
227
228    pub fn set_body_crc(&mut self, body_crc: u32) {
229        self.body_crc = body_crc;
230    }
231
232    pub fn set_reconsume_times(&mut self, reconsume_times: i32) {
233        self.reconsume_times = reconsume_times;
234    }
235
236    pub fn set_prepared_transaction_offset(&mut self, prepared_transaction_offset: i64) {
237        self.prepared_transaction_offset = prepared_transaction_offset;
238    }
239
240    pub fn properties(&self) -> &HashMap<CheetahString, CheetahString> {
241        self.message.properties()
242    }
243
244    pub fn get_tags(&self) -> Option<CheetahString> {
245        self.message.get_tags()
246    }
247}
248
249impl Default for MessageExt {
250    fn default() -> Self {
251        Self {
252            message: Message::default(),
253            broker_name: CheetahString::default(),
254            queue_id: 0,
255            store_size: 0,
256            queue_offset: 0,
257            sys_flag: 0,
258            born_timestamp: 0,
259            born_host: "127.0.0.1:10911".parse().unwrap(),
260            store_timestamp: 0,
261            store_host: "127.0.0.1:10911".parse().unwrap(),
262            msg_id: CheetahString::default(),
263            commit_log_offset: 0,
264            body_crc: 0,
265            reconsume_times: 0,
266            prepared_transaction_offset: 0,
267        }
268    }
269}
270
271impl fmt::Display for MessageExt {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        write!(
274            f,
275            "MessageExt {{ message: {}, broker_name: {}, queue_id: {}, store_size: {}, \
276             queue_offset: {}, sys_flag: {}, born_timestamp: {}, born_host: {}, store_timestamp: \
277             {}, store_host: {}, msg_id: {}, commit_log_offset: {}, body_crc: {}, \
278             reconsume_times: {}, prepared_transaction_offset: {} }}",
279            self.message,
280            self.broker_name,
281            self.queue_id,
282            self.store_size,
283            self.queue_offset,
284            self.sys_flag,
285            self.born_timestamp,
286            self.born_host,
287            self.store_timestamp,
288            self.store_host,
289            self.msg_id,
290            self.commit_log_offset,
291            self.body_crc,
292            self.reconsume_times,
293            self.prepared_transaction_offset
294        )
295    }
296}
297impl MessageTrait for MessageExt {
298    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
299        self.message.put_property(key, value);
300    }
301
302    fn clear_property(&mut self, name: &str) {
303        self.message.clear_property(name);
304    }
305
306    fn get_property(&self, name: &CheetahString) -> Option<CheetahString> {
307        self.message.get_property(name)
308    }
309
310    fn get_topic(&self) -> &CheetahString {
311        self.message.get_topic()
312    }
313
314    fn set_topic(&mut self, topic: CheetahString) {
315        self.message.set_topic(topic);
316    }
317
318    fn get_flag(&self) -> i32 {
319        self.message.get_flag()
320    }
321
322    fn set_flag(&mut self, flag: i32) {
323        self.message.set_flag(flag);
324    }
325
326    fn get_body(&self) -> Option<&Bytes> {
327        self.message.get_body()
328    }
329
330    fn set_body(&mut self, body: Bytes) {
331        self.message.set_body(body);
332    }
333
334    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
335        self.message.get_properties()
336    }
337
338    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
339        self.message.set_properties(properties);
340    }
341
342    #[inline]
343    fn get_transaction_id(&self) -> Option<&CheetahString> {
344        self.message.get_transaction_id()
345    }
346
347    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
348        self.message.set_transaction_id(transaction_id);
349    }
350
351    fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes> {
352        self.message.get_compressed_body_mut()
353    }
354
355    fn get_compressed_body(&self) -> Option<&Bytes> {
356        self.message.get_compressed_body()
357    }
358
359    fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
360        self.message.set_compressed_body_mut(compressed_body);
361    }
362
363    #[inline]
364    fn take_body(&mut self) -> Option<Bytes> {
365        self.message.take_body()
366    }
367
368    fn as_any(&self) -> &dyn Any {
369        self
370    }
371
372    fn as_any_mut(&mut self) -> &mut dyn Any {
373        self
374    }
375}