rocketmq_common/common/message/
message_envelope.rs1use std::collections::HashMap;
16use std::fmt;
17use std::fmt::Display;
18use std::net::SocketAddr;
19
20use bytes::Bytes;
21use cheetah_string::CheetahString;
22
23use crate::common::message::message_client_id_setter::MessageClientIDSetter;
24use crate::common::message::message_single::Message;
25use crate::common::message::routing_context::RoutingContext;
26use crate::common::message::storage_metadata::StorageMetadata;
27use crate::common::message::MessageTrait;
28
29#[derive(Clone, Debug)]
40pub struct MessageEnvelope {
41 message: Message,
43
44 routing: RoutingContext,
46
47 storage: StorageMetadata,
49
50 msg_id: CheetahString,
52
53 body_crc: u32,
55
56 reconsume_times: i32,
58
59 prepared_transaction_offset: i64,
61}
62
63impl MessageEnvelope {
64 #[allow(clippy::too_many_arguments)]
66 pub fn new(
67 message: Message,
68 routing: RoutingContext,
69 storage: StorageMetadata,
70 msg_id: CheetahString,
71 body_crc: u32,
72 reconsume_times: i32,
73 prepared_transaction_offset: i64,
74 ) -> Self {
75 Self {
76 message,
77 routing,
78 storage,
79 msg_id,
80 body_crc,
81 reconsume_times,
82 prepared_transaction_offset,
83 }
84 }
85
86 #[inline]
90 pub fn message(&self) -> &Message {
91 &self.message
92 }
93
94 #[inline]
96 pub fn message_mut(&mut self) -> &mut Message {
97 &mut self.message
98 }
99
100 #[inline]
102 pub fn topic(&self) -> &CheetahString {
103 self.message.topic()
104 }
105
106 #[inline]
108 pub fn body(&self) -> Option<Bytes> {
109 self.message.body()
110 }
111
112 #[inline]
114 pub fn properties(&self) -> &HashMap<CheetahString, CheetahString> {
115 self.message.properties().as_map()
116 }
117
118 #[inline]
120 pub fn tags(&self) -> Option<CheetahString> {
121 self.message.get_tags()
122 }
123
124 #[inline]
126 pub fn keys(&self) -> Option<Vec<String>> {
127 self.message.keys()
128 }
129
130 #[inline]
132 pub fn flag(&self) -> i32 {
133 self.message.flag()
134 }
135
136 #[inline]
140 pub fn routing(&self) -> &RoutingContext {
141 &self.routing
142 }
143
144 #[inline]
146 pub(crate) fn routing_mut(&mut self) -> &mut RoutingContext {
147 &mut self.routing
148 }
149
150 #[inline]
152 pub fn born_host(&self) -> SocketAddr {
153 self.routing.born_host()
154 }
155
156 #[inline]
158 pub fn born_timestamp(&self) -> i64 {
159 self.routing.born_timestamp()
160 }
161
162 #[inline]
164 pub fn sys_flag(&self) -> i32 {
165 self.routing.sys_flag()
166 }
167
168 #[inline]
170 pub fn born_host_bytes(&self) -> Bytes {
171 self.routing.born_host_bytes()
172 }
173
174 #[inline]
178 pub fn storage(&self) -> &StorageMetadata {
179 &self.storage
180 }
181
182 #[inline]
184 pub(crate) fn storage_mut(&mut self) -> &mut StorageMetadata {
185 &mut self.storage
186 }
187
188 #[inline]
190 pub fn broker_name(&self) -> &str {
191 self.storage.broker_name()
192 }
193
194 #[inline]
196 pub fn queue_id(&self) -> i32 {
197 self.storage.queue_id()
198 }
199
200 #[inline]
202 pub fn queue_offset(&self) -> i64 {
203 self.storage.queue_offset()
204 }
205
206 #[inline]
208 pub fn commit_log_offset(&self) -> i64 {
209 self.storage.commit_log_offset()
210 }
211
212 #[inline]
214 pub fn store_timestamp(&self) -> i64 {
215 self.storage.store_timestamp()
216 }
217
218 #[inline]
220 pub fn store_host(&self) -> SocketAddr {
221 self.storage.store_host()
222 }
223
224 #[inline]
226 pub fn store_size(&self) -> i32 {
227 self.storage.store_size()
228 }
229
230 #[inline]
232 pub fn store_host_bytes(&self) -> Bytes {
233 self.storage.store_host_bytes()
234 }
235
236 #[inline]
240 pub fn msg_id(&self) -> &CheetahString {
241 &self.msg_id
242 }
243
244 pub fn client_msg_id(&self) -> CheetahString {
248 MessageClientIDSetter::get_uniq_id(self).unwrap_or_else(|| self.msg_id.clone())
249 }
250
251 #[inline]
253 pub fn body_crc(&self) -> u32 {
254 self.body_crc
255 }
256
257 #[inline]
259 pub fn reconsume_times(&self) -> i32 {
260 self.reconsume_times
261 }
262
263 #[inline]
265 pub fn prepared_transaction_offset(&self) -> i64 {
266 self.prepared_transaction_offset
267 }
268
269 #[inline]
273 pub(crate) fn set_msg_id(&mut self, msg_id: CheetahString) {
274 self.msg_id = msg_id;
275 }
276
277 #[inline]
279 pub(crate) fn set_reconsume_times(&mut self, reconsume_times: i32) {
280 self.reconsume_times = reconsume_times;
281 }
282
283 #[inline]
285 pub(crate) fn set_body_crc(&mut self, body_crc: u32) {
286 self.body_crc = body_crc;
287 }
288
289 #[inline]
291 pub(crate) fn set_prepared_transaction_offset(&mut self, offset: i64) {
292 self.prepared_transaction_offset = offset;
293 }
294
295 #[inline]
297 pub(crate) fn with_born_host_v6_flag(&mut self) {
298 self.routing.with_born_host_v6_flag();
299 }
300
301 #[inline]
303 pub(crate) fn with_store_host_v6_flag(&mut self) {
304 let mut sys_flag = self.sys_flag();
305 self.storage.with_store_host_v6_flag(&mut sys_flag);
306 }
307}
308
309impl Default for MessageEnvelope {
310 fn default() -> Self {
311 Self {
312 message: Message::default(),
313 routing: RoutingContext::default(),
314 storage: StorageMetadata::default(),
315 msg_id: CheetahString::new(),
316 body_crc: 0,
317 reconsume_times: 0,
318 prepared_transaction_offset: 0,
319 }
320 }
321}
322
323impl Display for MessageEnvelope {
324 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325 write!(
326 f,
327 "MessageEnvelope {{ topic: {}, queue_id: {}, queue_offset: {}, commit_log_offset: {}, msg_id: {}, \
328 reconsume_times: {} }}",
329 self.topic(),
330 self.queue_id(),
331 self.queue_offset(),
332 self.commit_log_offset(),
333 self.msg_id,
334 self.reconsume_times
335 )
336 }
337}
338
339impl crate::common::message::MessageTrait for MessageEnvelope {
341 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
342 self.message.put_property(key, value);
343 }
344
345 fn clear_property(&mut self, name: &str) {
346 self.message.clear_property(name);
347 }
348
349 fn property(&self, name: &CheetahString) -> Option<CheetahString> {
350 self.message.get_property(name)
351 }
352
353 fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
354 self.message.property_ref(name)
355 }
356
357 fn topic(&self) -> &CheetahString {
358 self.topic()
359 }
360
361 fn set_topic(&mut self, topic: CheetahString) {
362 self.message.set_topic(topic);
363 }
364
365 fn get_flag(&self) -> i32 {
366 self.flag()
367 }
368
369 fn set_flag(&mut self, flag: i32) {
370 self.message.set_flag(flag);
371 }
372
373 fn get_body(&self) -> Option<&Bytes> {
374 self.message.get_body()
375 }
376
377 fn set_body(&mut self, body: Bytes) {
378 self.message.set_body(Some(body));
379 }
380
381 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
382 self.properties()
383 }
384
385 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
386 self.message.set_properties(properties);
387 }
388
389 fn transaction_id(&self) -> Option<&CheetahString> {
390 MessageTrait::transaction_id(&self.message)
391 }
392
393 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
394 self.message.set_transaction_id(transaction_id);
395 }
396
397 fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes> {
398 self.message.get_compressed_body_mut()
399 }
400
401 fn get_compressed_body(&self) -> Option<&Bytes> {
402 self.message.get_compressed_body()
403 }
404
405 fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
406 self.message.set_compressed_body_mut(compressed_body);
407 }
408
409 fn take_body(&mut self) -> Option<Bytes> {
410 self.message.take_body()
411 }
412
413 fn as_any(&self) -> &dyn std::any::Any {
414 self
415 }
416
417 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
418 self
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425
426 fn create_test_envelope() -> MessageEnvelope {
427 let message = Message::default();
428 let routing = RoutingContext::default();
429 let storage = StorageMetadata::default();
430
431 MessageEnvelope::new(
432 message,
433 routing,
434 storage,
435 CheetahString::from_static_str("test-msg-id"),
436 12345,
437 0,
438 0,
439 )
440 }
441
442 #[test]
443 fn test_envelope_creation() {
444 let envelope = create_test_envelope();
445 assert_eq!(envelope.msg_id().as_str(), "test-msg-id");
446 assert_eq!(envelope.body_crc(), 12345);
447 assert_eq!(envelope.reconsume_times(), 0);
448 }
449
450 #[test]
451 fn test_message_access() {
452 let envelope = create_test_envelope();
453 let _message = envelope.message();
454 let _topic = envelope.topic();
455 let _body = envelope.body();
456 let _properties = envelope.properties();
457 }
458
459 #[test]
460 fn test_routing_access() {
461 let envelope = create_test_envelope();
462 let _born_host = envelope.born_host();
463 let _born_timestamp = envelope.born_timestamp();
464 let _sys_flag = envelope.sys_flag();
465 }
466
467 #[test]
468 fn test_storage_access() {
469 let envelope = create_test_envelope();
470 let _broker_name = envelope.broker_name();
471 let _queue_id = envelope.queue_id();
472 let _queue_offset = envelope.queue_offset();
473 let _commit_log_offset = envelope.commit_log_offset();
474 }
475
476 #[test]
477 fn test_setters() {
478 let mut envelope = create_test_envelope();
479
480 envelope.set_msg_id(CheetahString::from_static_str("new-msg-id"));
481 envelope.set_reconsume_times(5);
482 envelope.set_body_crc(99999);
483 envelope.set_prepared_transaction_offset(1000);
484
485 assert_eq!(envelope.msg_id().as_str(), "new-msg-id");
486 assert_eq!(envelope.reconsume_times(), 5);
487 assert_eq!(envelope.body_crc(), 99999);
488 assert_eq!(envelope.prepared_transaction_offset(), 1000);
489 }
490
491 #[test]
492 fn test_display() {
493 let envelope = create_test_envelope();
494 let display_str = format!("{}", envelope);
495 assert!(display_str.contains("MessageEnvelope"));
496 assert!(display_str.contains("test-msg-id"));
497 }
498}