rocketmq_common/common/message/
message_ext.rs1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt;
20use std::fmt::Display;
21use std::fmt::Formatter;
22use std::net::SocketAddr;
23
24use bytes::Buf;
25use bytes::BufMut;
26use bytes::Bytes;
27use cheetah_string::CheetahString;
28
29use crate::common::message::message_single::Message;
30use crate::common::message::MessageTrait;
31use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
32
33#[derive(Clone, Debug)]
34pub struct MessageExt {
35 pub message: Message,
36 pub broker_name: CheetahString,
37 pub queue_id: i32,
38 pub store_size: i32,
39 pub queue_offset: i64,
40 pub sys_flag: i32,
41 pub born_timestamp: i64,
42 pub born_host: SocketAddr,
43 pub store_timestamp: i64,
44 pub store_host: SocketAddr,
45 pub msg_id: CheetahString,
46 pub commit_log_offset: i64,
47 pub body_crc: u32,
48 pub reconsume_times: i32,
49 pub prepared_transaction_offset: i64,
50}
51
52impl MessageExt {
53 pub fn socket_address_2_byte_buffer(ip: &SocketAddr) -> bytes::Bytes {
54 match ip {
55 SocketAddr::V4(value) => {
56 let mut byte_buffer = bytes::BytesMut::with_capacity(4 + 4);
57 byte_buffer.put_slice(&value.ip().octets());
58 byte_buffer.put_i32(value.port() as i32);
59 byte_buffer.copy_to_bytes(byte_buffer.len())
60 }
61 SocketAddr::V6(value) => {
62 let mut byte_buffer = bytes::BytesMut::with_capacity(16 + 4);
63 byte_buffer.put_slice(&value.ip().octets());
64 byte_buffer.put_i32(value.port() as i32);
65 byte_buffer.copy_to_bytes(byte_buffer.len())
66 }
67 }
68 }
69
70 #[inline]
71 pub fn born_host_bytes(&self) -> bytes::Bytes {
72 Self::socket_address_2_byte_buffer(&self.born_host)
73 }
74
75 #[inline]
76 pub fn born_store_bytes(&self) -> bytes::Bytes {
77 Self::socket_address_2_byte_buffer(&self.store_host)
78 }
79
80 #[inline]
81 pub fn topic(&self) -> &CheetahString {
82 self.message.topic()
83 }
84
85 #[inline]
86 pub fn born_host(&self) -> SocketAddr {
87 self.born_host
88 }
89
90 #[inline]
91 pub fn store_host(&self) -> SocketAddr {
92 self.store_host
93 }
94
95 #[inline]
96 pub fn with_born_host_v6_flag(&mut self) {
97 self.sys_flag |= MessageSysFlag::BORNHOST_V6_FLAG;
98 }
99
100 #[inline]
101 pub fn with_store_host_v6_flag(&mut self) {
102 self.sys_flag |= MessageSysFlag::STOREHOSTADDRESS_V6_FLAG;
103 }
104
105 #[inline]
106 pub fn body(&self) -> Option<bytes::Bytes> {
107 self.message.body()
108 }
109
110 #[inline]
111 pub fn sys_flag(&self) -> i32 {
112 self.sys_flag
113 }
114
115 #[inline]
116 pub fn body_crc(&self) -> u32 {
117 self.body_crc
118 }
119
120 #[inline]
121 pub fn queue_id(&self) -> i32 {
122 self.queue_id
123 }
124
125 #[inline]
126 pub fn flag(&self) -> i32 {
127 self.message.flag()
128 }
129
130 #[inline]
131 pub fn message_inner(&self) -> &Message {
132 &self.message
133 }
134
135 #[inline]
136 pub fn broker_name(&self) -> &str {
137 &self.broker_name
138 }
139
140 #[inline]
141 pub fn store_size(&self) -> i32 {
142 self.store_size
143 }
144
145 #[inline]
146 pub fn queue_offset(&self) -> i64 {
147 self.queue_offset
148 }
149
150 #[inline]
151 pub fn born_timestamp(&self) -> i64 {
152 self.born_timestamp
153 }
154
155 #[inline]
156 pub fn store_timestamp(&self) -> i64 {
157 self.store_timestamp
158 }
159
160 #[inline]
161 pub fn msg_id(&self) -> &CheetahString {
162 &self.msg_id
163 }
164
165 #[inline]
166 pub fn commit_log_offset(&self) -> i64 {
167 self.commit_log_offset
168 }
169
170 #[inline]
171 pub fn reconsume_times(&self) -> i32 {
172 self.reconsume_times
173 }
174
175 #[inline]
176 pub fn prepared_transaction_offset(&self) -> i64 {
177 self.prepared_transaction_offset
178 }
179
180 pub fn set_message_inner(&mut self, message_inner: Message) {
181 self.message = message_inner;
182 }
183
184 pub fn set_broker_name(&mut self, broker_name: CheetahString) {
185 self.broker_name = broker_name;
186 }
187
188 pub fn set_queue_id(&mut self, queue_id: i32) {
189 self.queue_id = queue_id;
190 }
191
192 pub fn set_store_size(&mut self, store_size: i32) {
193 self.store_size = store_size;
194 }
195
196 pub fn set_queue_offset(&mut self, queue_offset: i64) {
197 self.queue_offset = queue_offset;
198 }
199
200 pub fn set_sys_flag(&mut self, sys_flag: i32) {
201 self.sys_flag = sys_flag;
202 }
203
204 pub fn set_born_timestamp(&mut self, born_timestamp: i64) {
205 self.born_timestamp = born_timestamp;
206 }
207
208 pub fn set_born_host(&mut self, born_host: SocketAddr) {
209 self.born_host = born_host;
210 }
211
212 pub fn set_store_timestamp(&mut self, store_timestamp: i64) {
213 self.store_timestamp = store_timestamp;
214 }
215
216 pub fn set_store_host(&mut self, store_host: SocketAddr) {
217 self.store_host = store_host;
218 }
219
220 pub fn set_msg_id(&mut self, msg_id: CheetahString) {
221 self.msg_id = msg_id;
222 }
223
224 pub fn set_commit_log_offset(&mut self, commit_log_offset: i64) {
225 self.commit_log_offset = commit_log_offset;
226 }
227
228 pub fn set_body_crc(&mut self, body_crc: u32) {
229 self.body_crc = body_crc;
230 }
231
232 pub fn set_reconsume_times(&mut self, reconsume_times: i32) {
233 self.reconsume_times = reconsume_times;
234 }
235
236 pub fn set_prepared_transaction_offset(&mut self, prepared_transaction_offset: i64) {
237 self.prepared_transaction_offset = prepared_transaction_offset;
238 }
239
240 pub fn properties(&self) -> &HashMap<CheetahString, CheetahString> {
241 self.message.properties()
242 }
243
244 pub fn get_tags(&self) -> Option<CheetahString> {
245 self.message.get_tags()
246 }
247}
248
249impl Default for MessageExt {
250 fn default() -> Self {
251 Self {
252 message: Message::default(),
253 broker_name: CheetahString::default(),
254 queue_id: 0,
255 store_size: 0,
256 queue_offset: 0,
257 sys_flag: 0,
258 born_timestamp: 0,
259 born_host: "127.0.0.1:10911".parse().unwrap(),
260 store_timestamp: 0,
261 store_host: "127.0.0.1:10911".parse().unwrap(),
262 msg_id: CheetahString::default(),
263 commit_log_offset: 0,
264 body_crc: 0,
265 reconsume_times: 0,
266 prepared_transaction_offset: 0,
267 }
268 }
269}
270
271impl fmt::Display for MessageExt {
272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273 write!(
274 f,
275 "MessageExt {{ message: {}, broker_name: {}, queue_id: {}, store_size: {}, \
276 queue_offset: {}, sys_flag: {}, born_timestamp: {}, born_host: {}, store_timestamp: \
277 {}, store_host: {}, msg_id: {}, commit_log_offset: {}, body_crc: {}, \
278 reconsume_times: {}, prepared_transaction_offset: {} }}",
279 self.message,
280 self.broker_name,
281 self.queue_id,
282 self.store_size,
283 self.queue_offset,
284 self.sys_flag,
285 self.born_timestamp,
286 self.born_host,
287 self.store_timestamp,
288 self.store_host,
289 self.msg_id,
290 self.commit_log_offset,
291 self.body_crc,
292 self.reconsume_times,
293 self.prepared_transaction_offset
294 )
295 }
296}
297impl MessageTrait for MessageExt {
298 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
299 self.message.put_property(key, value);
300 }
301
302 fn clear_property(&mut self, name: &str) {
303 self.message.clear_property(name);
304 }
305
306 fn get_property(&self, name: &CheetahString) -> Option<CheetahString> {
307 self.message.get_property(name)
308 }
309
310 fn get_topic(&self) -> &CheetahString {
311 self.message.get_topic()
312 }
313
314 fn set_topic(&mut self, topic: CheetahString) {
315 self.message.set_topic(topic);
316 }
317
318 fn get_flag(&self) -> i32 {
319 self.message.get_flag()
320 }
321
322 fn set_flag(&mut self, flag: i32) {
323 self.message.set_flag(flag);
324 }
325
326 fn get_body(&self) -> Option<&Bytes> {
327 self.message.get_body()
328 }
329
330 fn set_body(&mut self, body: Bytes) {
331 self.message.set_body(body);
332 }
333
334 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
335 self.message.get_properties()
336 }
337
338 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
339 self.message.set_properties(properties);
340 }
341
342 #[inline]
343 fn get_transaction_id(&self) -> Option<&CheetahString> {
344 self.message.get_transaction_id()
345 }
346
347 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
348 self.message.set_transaction_id(transaction_id);
349 }
350
351 fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes> {
352 self.message.get_compressed_body_mut()
353 }
354
355 fn get_compressed_body(&self) -> Option<&Bytes> {
356 self.message.get_compressed_body()
357 }
358
359 fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
360 self.message.set_compressed_body_mut(compressed_body);
361 }
362
363 #[inline]
364 fn take_body(&mut self) -> Option<Bytes> {
365 self.message.take_body()
366 }
367
368 fn as_any(&self) -> &dyn Any {
369 self
370 }
371
372 fn as_any_mut(&mut self) -> &mut dyn Any {
373 self
374 }
375}