Skip to main content

rocketmq_common/common/message/
message_envelope.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::fmt::Display;
18use std::net::SocketAddr;
19
20use bytes::Bytes;
21use cheetah_string::CheetahString;
22
23use crate::common::message::message_client_id_setter::MessageClientIDSetter;
24use crate::common::message::message_single::Message;
25use crate::common::message::routing_context::RoutingContext;
26use crate::common::message::storage_metadata::StorageMetadata;
27use crate::common::message::MessageTrait;
28
29/// Complete message envelope (stored message)
30///
31/// Represents a message that has been received and stored by the Broker, containing complete
32/// lifecycle information.
33///
34/// # Use Cases
35///
36/// - Consumer receiving messages
37/// - Message query and tracking
38/// - Message passing between Broker internal modules
39#[derive(Clone, Debug)]
40pub struct MessageEnvelope {
41    /// Message content
42    message: Message,
43
44    /// Routing context (network layer)
45    routing: RoutingContext,
46
47    /// Storage metadata (persistence layer)
48    storage: StorageMetadata,
49
50    /// Message ID (calculated from store_host + commit_log_offset)
51    msg_id: CheetahString,
52
53    /// Message body CRC checksum
54    body_crc: u32,
55
56    /// Reconsume times
57    reconsume_times: i32,
58
59    /// Prepared transaction offset (only valid for transaction messages)
60    prepared_transaction_offset: i64,
61}
62
63impl MessageEnvelope {
64    /// Creates a new message envelope
65    #[allow(clippy::too_many_arguments)]
66    pub fn new(
67        message: Message,
68        routing: RoutingContext,
69        storage: StorageMetadata,
70        msg_id: CheetahString,
71        body_crc: u32,
72        reconsume_times: i32,
73        prepared_transaction_offset: i64,
74    ) -> Self {
75        Self {
76            message,
77            routing,
78            storage,
79            msg_id,
80            body_crc,
81            reconsume_times,
82            prepared_transaction_offset,
83        }
84    }
85
86    // ===== Message Access =====
87
88    /// Gets the message reference
89    #[inline]
90    pub fn message(&self) -> &Message {
91        &self.message
92    }
93
94    /// Gets the mutable message reference
95    #[inline]
96    pub fn message_mut(&mut self) -> &mut Message {
97        &mut self.message
98    }
99
100    /// Gets the message topic
101    #[inline]
102    pub fn topic(&self) -> &CheetahString {
103        self.message.topic()
104    }
105
106    /// Gets the message body
107    #[inline]
108    pub fn body(&self) -> Option<Bytes> {
109        self.message.body()
110    }
111
112    /// Gets the message properties
113    #[inline]
114    pub fn properties(&self) -> &HashMap<CheetahString, CheetahString> {
115        self.message.properties().as_map()
116    }
117
118    /// Gets the message tags
119    #[inline]
120    pub fn tags(&self) -> Option<CheetahString> {
121        self.message.get_tags()
122    }
123
124    /// Gets the message keys
125    #[inline]
126    pub fn keys(&self) -> Option<Vec<String>> {
127        self.message.keys()
128    }
129
130    /// Gets the message flag
131    #[inline]
132    pub fn flag(&self) -> i32 {
133        self.message.flag()
134    }
135
136    // ===== Routing Access =====
137
138    /// Gets the routing context
139    #[inline]
140    pub fn routing(&self) -> &RoutingContext {
141        &self.routing
142    }
143
144    /// Gets the mutable routing context (internal use)
145    #[inline]
146    pub(crate) fn routing_mut(&mut self) -> &mut RoutingContext {
147        &mut self.routing
148    }
149
150    /// Gets the message sender's client address
151    #[inline]
152    pub fn born_host(&self) -> SocketAddr {
153        self.routing.born_host()
154    }
155
156    /// Gets the message creation timestamp
157    #[inline]
158    pub fn born_timestamp(&self) -> i64 {
159        self.routing.born_timestamp()
160    }
161
162    /// Gets the system flag bits
163    #[inline]
164    pub fn sys_flag(&self) -> i32 {
165        self.routing.sys_flag()
166    }
167
168    /// Gets the born host bytes
169    #[inline]
170    pub fn born_host_bytes(&self) -> Bytes {
171        self.routing.born_host_bytes()
172    }
173
174    // ===== Storage Access =====
175
176    /// Gets the storage metadata
177    #[inline]
178    pub fn storage(&self) -> &StorageMetadata {
179        &self.storage
180    }
181
182    /// Gets the mutable storage metadata (internal use)
183    #[inline]
184    pub(crate) fn storage_mut(&mut self) -> &mut StorageMetadata {
185        &mut self.storage
186    }
187
188    /// Gets the broker name
189    #[inline]
190    pub fn broker_name(&self) -> &str {
191        self.storage.broker_name()
192    }
193
194    /// Gets the queue ID
195    #[inline]
196    pub fn queue_id(&self) -> i32 {
197        self.storage.queue_id()
198    }
199
200    /// Gets the queue logical offset
201    #[inline]
202    pub fn queue_offset(&self) -> i64 {
203        self.storage.queue_offset()
204    }
205
206    /// Gets the CommitLog physical offset
207    #[inline]
208    pub fn commit_log_offset(&self) -> i64 {
209        self.storage.commit_log_offset()
210    }
211
212    /// Gets the storage timestamp
213    #[inline]
214    pub fn store_timestamp(&self) -> i64 {
215        self.storage.store_timestamp()
216    }
217
218    /// Gets the storage host address
219    #[inline]
220    pub fn store_host(&self) -> SocketAddr {
221        self.storage.store_host()
222    }
223
224    /// Gets the storage size in bytes
225    #[inline]
226    pub fn store_size(&self) -> i32 {
227        self.storage.store_size()
228    }
229
230    /// Gets the store host bytes
231    #[inline]
232    pub fn store_host_bytes(&self) -> Bytes {
233        self.storage.store_host_bytes()
234    }
235
236    // ===== Other Metadata Access =====
237
238    /// Gets the message ID (offset-based)
239    #[inline]
240    pub fn msg_id(&self) -> &CheetahString {
241        &self.msg_id
242    }
243
244    /// Gets the client message ID (UNIQ_ID preferred, otherwise returns offset msg_id)
245    ///
246    /// This is the client-side message ID, preferring UNIQ_ID (globally unique)
247    pub fn client_msg_id(&self) -> CheetahString {
248        MessageClientIDSetter::get_uniq_id(self).unwrap_or_else(|| self.msg_id.clone())
249    }
250
251    /// Gets the message body CRC
252    #[inline]
253    pub fn body_crc(&self) -> u32 {
254        self.body_crc
255    }
256
257    /// Gets the reconsume times
258    #[inline]
259    pub fn reconsume_times(&self) -> i32 {
260        self.reconsume_times
261    }
262
263    /// Gets the prepared transaction offset
264    #[inline]
265    pub fn prepared_transaction_offset(&self) -> i64 {
266        self.prepared_transaction_offset
267    }
268
269    // ===== Modification Methods (Internal Use) =====
270
271    /// Sets the message ID
272    #[inline]
273    pub(crate) fn set_msg_id(&mut self, msg_id: CheetahString) {
274        self.msg_id = msg_id;
275    }
276
277    /// Sets the reconsume times
278    #[inline]
279    pub(crate) fn set_reconsume_times(&mut self, reconsume_times: i32) {
280        self.reconsume_times = reconsume_times;
281    }
282
283    /// Sets the body CRC
284    #[inline]
285    pub(crate) fn set_body_crc(&mut self, body_crc: u32) {
286        self.body_crc = body_crc;
287    }
288
289    /// Sets the prepared transaction offset
290    #[inline]
291    pub(crate) fn set_prepared_transaction_offset(&mut self, offset: i64) {
292        self.prepared_transaction_offset = offset;
293    }
294
295    /// Sets the born host v6 flag
296    #[inline]
297    pub(crate) fn with_born_host_v6_flag(&mut self) {
298        self.routing.with_born_host_v6_flag();
299    }
300
301    /// Sets the store host v6 flag
302    #[inline]
303    pub(crate) fn with_store_host_v6_flag(&mut self) {
304        let mut sys_flag = self.sys_flag();
305        self.storage.with_store_host_v6_flag(&mut sys_flag);
306    }
307}
308
309impl Default for MessageEnvelope {
310    fn default() -> Self {
311        Self {
312            message: Message::default(),
313            routing: RoutingContext::default(),
314            storage: StorageMetadata::default(),
315            msg_id: CheetahString::new(),
316            body_crc: 0,
317            reconsume_times: 0,
318            prepared_transaction_offset: 0,
319        }
320    }
321}
322
323impl Display for MessageEnvelope {
324    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325        write!(
326            f,
327            "MessageEnvelope {{ topic: {}, queue_id: {}, queue_offset: {}, commit_log_offset: {}, msg_id: {}, \
328             reconsume_times: {} }}",
329            self.topic(),
330            self.queue_id(),
331            self.queue_offset(),
332            self.commit_log_offset(),
333            self.msg_id,
334            self.reconsume_times
335        )
336    }
337}
338
339// Implement MessageTrait for MessageEnvelope
340impl crate::common::message::MessageTrait for MessageEnvelope {
341    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
342        self.message.put_property(key, value);
343    }
344
345    fn clear_property(&mut self, name: &str) {
346        self.message.clear_property(name);
347    }
348
349    fn property(&self, name: &CheetahString) -> Option<CheetahString> {
350        self.message.get_property(name)
351    }
352
353    fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
354        self.message.property_ref(name)
355    }
356
357    fn topic(&self) -> &CheetahString {
358        self.topic()
359    }
360
361    fn set_topic(&mut self, topic: CheetahString) {
362        self.message.set_topic(topic);
363    }
364
365    fn get_flag(&self) -> i32 {
366        self.flag()
367    }
368
369    fn set_flag(&mut self, flag: i32) {
370        self.message.set_flag(flag);
371    }
372
373    fn get_body(&self) -> Option<&Bytes> {
374        self.message.get_body()
375    }
376
377    fn set_body(&mut self, body: Bytes) {
378        self.message.set_body(Some(body));
379    }
380
381    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
382        self.properties()
383    }
384
385    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
386        self.message.set_properties(properties);
387    }
388
389    fn transaction_id(&self) -> Option<&CheetahString> {
390        MessageTrait::transaction_id(&self.message)
391    }
392
393    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
394        self.message.set_transaction_id(transaction_id);
395    }
396
397    fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes> {
398        self.message.get_compressed_body_mut()
399    }
400
401    fn get_compressed_body(&self) -> Option<&Bytes> {
402        self.message.get_compressed_body()
403    }
404
405    fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
406        self.message.set_compressed_body_mut(compressed_body);
407    }
408
409    fn take_body(&mut self) -> Option<Bytes> {
410        self.message.take_body()
411    }
412
413    fn as_any(&self) -> &dyn std::any::Any {
414        self
415    }
416
417    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
418        self
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425
426    fn create_test_envelope() -> MessageEnvelope {
427        let message = Message::default();
428        let routing = RoutingContext::default();
429        let storage = StorageMetadata::default();
430
431        MessageEnvelope::new(
432            message,
433            routing,
434            storage,
435            CheetahString::from_static_str("test-msg-id"),
436            12345,
437            0,
438            0,
439        )
440    }
441
442    #[test]
443    fn test_envelope_creation() {
444        let envelope = create_test_envelope();
445        assert_eq!(envelope.msg_id().as_str(), "test-msg-id");
446        assert_eq!(envelope.body_crc(), 12345);
447        assert_eq!(envelope.reconsume_times(), 0);
448    }
449
450    #[test]
451    fn test_message_access() {
452        let envelope = create_test_envelope();
453        let _message = envelope.message();
454        let _topic = envelope.topic();
455        let _body = envelope.body();
456        let _properties = envelope.properties();
457    }
458
459    #[test]
460    fn test_routing_access() {
461        let envelope = create_test_envelope();
462        let _born_host = envelope.born_host();
463        let _born_timestamp = envelope.born_timestamp();
464        let _sys_flag = envelope.sys_flag();
465    }
466
467    #[test]
468    fn test_storage_access() {
469        let envelope = create_test_envelope();
470        let _broker_name = envelope.broker_name();
471        let _queue_id = envelope.queue_id();
472        let _queue_offset = envelope.queue_offset();
473        let _commit_log_offset = envelope.commit_log_offset();
474    }
475
476    #[test]
477    fn test_setters() {
478        let mut envelope = create_test_envelope();
479
480        envelope.set_msg_id(CheetahString::from_static_str("new-msg-id"));
481        envelope.set_reconsume_times(5);
482        envelope.set_body_crc(99999);
483        envelope.set_prepared_transaction_offset(1000);
484
485        assert_eq!(envelope.msg_id().as_str(), "new-msg-id");
486        assert_eq!(envelope.reconsume_times(), 5);
487        assert_eq!(envelope.body_crc(), 99999);
488        assert_eq!(envelope.prepared_transaction_offset(), 1000);
489    }
490
491    #[test]
492    fn test_display() {
493        let envelope = create_test_envelope();
494        let display_str = format!("{}", envelope);
495        assert!(display_str.contains("MessageEnvelope"));
496        assert!(display_str.contains("test-msg-id"));
497    }
498}