rocketmq_common/common/message/
message_ext.rs1use std::any::Any;
16use std::collections::HashMap;
17use std::fmt;
18use std::fmt::Display;
19use std::fmt::Formatter;
20use std::net::SocketAddr;
21
22use bytes::Buf;
23use bytes::BufMut;
24use bytes::Bytes;
25use cheetah_string::CheetahString;
26
27use crate::common::message::message_single::Message;
28use crate::common::message::MessageTrait;
29use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
30
31#[derive(Clone, Debug)]
32pub struct MessageExt {
33 pub message: Message,
34 pub broker_name: CheetahString,
35 pub queue_id: i32,
36 pub store_size: i32,
37 pub queue_offset: i64,
38 pub sys_flag: i32,
39 pub born_timestamp: i64,
40 pub born_host: SocketAddr,
41 pub store_timestamp: i64,
42 pub store_host: SocketAddr,
43 pub msg_id: CheetahString,
44 pub commit_log_offset: i64,
45 pub body_crc: u32,
46 pub reconsume_times: i32,
47 pub prepared_transaction_offset: i64,
48}
49
50impl MessageExt {
51 pub fn socket_address_2_byte_buffer(ip: &SocketAddr) -> bytes::Bytes {
52 match ip {
53 SocketAddr::V4(value) => {
54 let mut byte_buffer = bytes::BytesMut::with_capacity(4 + 4);
55 byte_buffer.put_slice(&value.ip().octets());
56 byte_buffer.put_i32(value.port() as i32);
57 byte_buffer.copy_to_bytes(byte_buffer.len())
58 }
59 SocketAddr::V6(value) => {
60 let mut byte_buffer = bytes::BytesMut::with_capacity(16 + 4);
61 byte_buffer.put_slice(&value.ip().octets());
62 byte_buffer.put_i32(value.port() as i32);
63 byte_buffer.copy_to_bytes(byte_buffer.len())
64 }
65 }
66 }
67
68 #[inline]
69 pub fn born_host_bytes(&self) -> bytes::Bytes {
70 Self::socket_address_2_byte_buffer(&self.born_host)
71 }
72
73 #[inline]
74 pub fn born_store_bytes(&self) -> bytes::Bytes {
75 Self::socket_address_2_byte_buffer(&self.store_host)
76 }
77
78 #[inline]
79 pub fn topic(&self) -> &CheetahString {
80 self.message.topic()
81 }
82
83 #[inline]
84 pub fn born_host(&self) -> SocketAddr {
85 self.born_host
86 }
87
88 #[inline]
89 pub fn store_host(&self) -> SocketAddr {
90 self.store_host
91 }
92
93 #[inline]
94 pub fn with_born_host_v6_flag(&mut self) {
95 self.sys_flag |= MessageSysFlag::BORNHOST_V6_FLAG;
96 }
97
98 #[inline]
99 pub fn with_store_host_v6_flag(&mut self) {
100 self.sys_flag |= MessageSysFlag::STOREHOSTADDRESS_V6_FLAG;
101 }
102
103 #[inline]
104 pub fn body(&self) -> Option<bytes::Bytes> {
105 self.message.body()
106 }
107
108 #[inline]
109 pub fn sys_flag(&self) -> i32 {
110 self.sys_flag
111 }
112
113 #[inline]
114 pub fn body_crc(&self) -> u32 {
115 self.body_crc
116 }
117
118 #[inline]
119 pub fn queue_id(&self) -> i32 {
120 self.queue_id
121 }
122
123 #[inline]
124 pub fn flag(&self) -> i32 {
125 self.message.flag()
126 }
127
128 #[inline]
129 pub fn message_inner(&self) -> &Message {
130 &self.message
131 }
132
133 #[inline]
134 pub fn broker_name(&self) -> &str {
135 &self.broker_name
136 }
137
138 #[inline]
139 pub fn store_size(&self) -> i32 {
140 self.store_size
141 }
142
143 #[inline]
144 pub fn queue_offset(&self) -> i64 {
145 self.queue_offset
146 }
147
148 #[inline]
149 pub fn born_timestamp(&self) -> i64 {
150 self.born_timestamp
151 }
152
153 #[inline]
154 pub fn store_timestamp(&self) -> i64 {
155 self.store_timestamp
156 }
157
158 #[inline]
159 pub fn msg_id(&self) -> &CheetahString {
160 &self.msg_id
161 }
162
163 #[inline]
164 pub fn commit_log_offset(&self) -> i64 {
165 self.commit_log_offset
166 }
167
168 #[inline]
169 pub fn reconsume_times(&self) -> i32 {
170 self.reconsume_times
171 }
172
173 #[inline]
174 pub fn prepared_transaction_offset(&self) -> i64 {
175 self.prepared_transaction_offset
176 }
177 #[inline]
178 pub fn set_message_inner(&mut self, message_inner: Message) {
179 self.message = message_inner;
180 }
181 #[inline]
182 pub fn set_broker_name(&mut self, broker_name: CheetahString) {
183 self.broker_name = broker_name;
184 }
185 #[inline]
186 pub fn set_queue_id(&mut self, queue_id: i32) {
187 self.queue_id = queue_id;
188 }
189 #[inline]
190 pub fn set_store_size(&mut self, store_size: i32) {
191 self.store_size = store_size;
192 }
193 #[inline]
194 pub fn set_queue_offset(&mut self, queue_offset: i64) {
195 self.queue_offset = queue_offset;
196 }
197 #[inline]
198 pub fn set_sys_flag(&mut self, sys_flag: i32) {
199 self.sys_flag = sys_flag;
200 }
201 #[inline]
202 pub fn set_born_timestamp(&mut self, born_timestamp: i64) {
203 self.born_timestamp = born_timestamp;
204 }
205 #[inline]
206 pub fn set_born_host(&mut self, born_host: SocketAddr) {
207 self.born_host = born_host;
208 }
209 #[inline]
210 pub fn set_store_timestamp(&mut self, store_timestamp: i64) {
211 self.store_timestamp = store_timestamp;
212 }
213 #[inline]
214 pub fn set_store_host(&mut self, store_host: SocketAddr) {
215 self.store_host = store_host;
216 }
217 #[inline]
218 pub fn set_msg_id(&mut self, msg_id: CheetahString) {
219 self.msg_id = msg_id;
220 }
221 #[inline]
222 pub fn set_commit_log_offset(&mut self, commit_log_offset: i64) {
223 self.commit_log_offset = commit_log_offset;
224 }
225 #[inline]
226 pub fn set_body_crc(&mut self, body_crc: u32) {
227 self.body_crc = body_crc;
228 }
229 #[inline]
230 pub fn set_reconsume_times(&mut self, reconsume_times: i32) {
231 self.reconsume_times = reconsume_times;
232 }
233 #[inline]
234 pub fn set_prepared_transaction_offset(&mut self, prepared_transaction_offset: i64) {
235 self.prepared_transaction_offset = prepared_transaction_offset;
236 }
237 #[inline]
238 pub fn properties(&self) -> &HashMap<CheetahString, CheetahString> {
239 self.message.properties().as_map()
240 }
241 #[inline]
242 pub fn get_tags(&self) -> Option<CheetahString> {
243 self.message.get_tags()
244 }
245}
246
247impl Default for MessageExt {
248 fn default() -> Self {
249 Self {
250 message: Message::default(),
251 broker_name: CheetahString::default(),
252 queue_id: 0,
253 store_size: 0,
254 queue_offset: 0,
255 sys_flag: 0,
256 born_timestamp: 0,
257 born_host: "127.0.0.1:10911".parse().unwrap(),
258 store_timestamp: 0,
259 store_host: "127.0.0.1:10911".parse().unwrap(),
260 msg_id: CheetahString::default(),
261 commit_log_offset: 0,
262 body_crc: 0,
263 reconsume_times: 0,
264 prepared_transaction_offset: 0,
265 }
266 }
267}
268
269impl fmt::Display for MessageExt {
270 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271 write!(
272 f,
273 "MessageExt {{ message: {}, broker_name: {}, queue_id: {}, store_size: {}, queue_offset: {}, sys_flag: \
274 {}, born_timestamp: {}, born_host: {}, store_timestamp: {}, store_host: {}, msg_id: {}, \
275 commit_log_offset: {}, body_crc: {}, reconsume_times: {}, prepared_transaction_offset: {} }}",
276 self.message,
277 self.broker_name,
278 self.queue_id,
279 self.store_size,
280 self.queue_offset,
281 self.sys_flag,
282 self.born_timestamp,
283 self.born_host,
284 self.store_timestamp,
285 self.store_host,
286 self.msg_id,
287 self.commit_log_offset,
288 self.body_crc,
289 self.reconsume_times,
290 self.prepared_transaction_offset
291 )
292 }
293}
294impl MessageTrait for MessageExt {
295 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
296 self.message.put_property(key, value);
297 }
298
299 fn clear_property(&mut self, name: &str) {
300 self.message.clear_property(name);
301 }
302
303 fn property(&self, name: &CheetahString) -> Option<CheetahString> {
304 self.message.get_property(name)
305 }
306
307 fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
308 self.message.property_ref(name)
309 }
310
311 fn topic(&self) -> &CheetahString {
312 self.message.topic()
313 }
314
315 fn set_topic(&mut self, topic: CheetahString) {
316 self.message.set_topic(topic);
317 }
318
319 fn get_flag(&self) -> i32 {
320 self.message.get_flag()
321 }
322
323 fn set_flag(&mut self, flag: i32) {
324 self.message.set_flag(flag);
325 }
326
327 fn get_body(&self) -> Option<&Bytes> {
328 self.message.get_body()
329 }
330
331 fn set_body(&mut self, body: Bytes) {
332 self.message.set_body(Some(body));
333 }
334
335 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
336 self.message.get_properties()
337 }
338
339 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
340 self.message.set_properties(properties);
341 }
342
343 #[inline]
344 fn transaction_id(&self) -> Option<&CheetahString> {
345 MessageTrait::transaction_id(&self.message)
346 }
347
348 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
349 self.message.set_transaction_id(transaction_id);
350 }
351
352 fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes> {
353 self.message.get_compressed_body_mut()
354 }
355
356 fn get_compressed_body(&self) -> Option<&Bytes> {
357 self.message.get_compressed_body()
358 }
359
360 fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
361 self.message.set_compressed_body_mut(compressed_body);
362 }
363 #[inline]
364 fn take_body(&mut self) -> Option<Bytes> {
365 self.message.take_body()
366 }
367
368 fn as_any(&self) -> &dyn Any {
369 self
370 }
371
372 fn as_any_mut(&mut self) -> &mut dyn Any {
373 self
374 }
375}
376
377impl std::ops::Deref for MessageExt {
379 type Target = Message;
380
381 fn deref(&self) -> &Self::Target {
382 &self.message
383 }
384}
385
386impl std::ops::DerefMut for MessageExt {
387 fn deref_mut(&mut self) -> &mut Self::Target {
388 &mut self.message
389 }
390}
391
392impl From<MessageExt> for crate::common::message::message_envelope::MessageEnvelope {
394 fn from(ext: MessageExt) -> Self {
395 use crate::common::message::message_envelope::MessageEnvelope;
396 use crate::common::message::routing_context::RoutingContext;
397 use crate::common::message::storage_metadata::StorageMetadata;
398
399 let routing = RoutingContext::new(ext.born_host, ext.born_timestamp, ext.sys_flag);
400
401 let storage = StorageMetadata::new(
402 ext.broker_name,
403 ext.queue_id,
404 ext.queue_offset,
405 ext.commit_log_offset,
406 ext.store_timestamp,
407 ext.store_host,
408 ext.store_size,
409 );
410
411 MessageEnvelope::new(
412 ext.message,
413 routing,
414 storage,
415 ext.msg_id,
416 ext.body_crc,
417 ext.reconsume_times,
418 ext.prepared_transaction_offset,
419 )
420 }
421}
422
423impl From<crate::common::message::message_envelope::MessageEnvelope> for MessageExt {
425 fn from(envelope: crate::common::message::message_envelope::MessageEnvelope) -> Self {
426 Self {
427 message: envelope.message().clone(),
428 broker_name: CheetahString::from_string(envelope.broker_name().to_string()),
429 queue_id: envelope.queue_id(),
430 store_size: envelope.store_size(),
431 queue_offset: envelope.queue_offset(),
432 sys_flag: envelope.sys_flag(),
433 born_timestamp: envelope.born_timestamp(),
434 born_host: envelope.born_host(),
435 store_timestamp: envelope.store_timestamp(),
436 store_host: envelope.store_host(),
437 msg_id: envelope.msg_id().clone(),
438 commit_log_offset: envelope.commit_log_offset(),
439 body_crc: envelope.body_crc(),
440 reconsume_times: envelope.reconsume_times(),
441 prepared_transaction_offset: envelope.prepared_transaction_offset(),
442 }
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use cheetah_string::CheetahString;
449
450 use super::*;
451 use crate::common::message::MessageTrait;
452
453 #[test]
454 fn message_ext_message_trait_transaction_id_returns_cheetah_string_ref() {
455 let mut message_ext = MessageExt::default();
456 message_ext.set_transaction_id(CheetahString::from_static_str("tx-123"));
457
458 let transaction_id = <MessageExt as MessageTrait>::transaction_id(&message_ext);
459
460 assert_eq!(transaction_id, Some(&CheetahString::from_static_str("tx-123")));
461 }
462}