Skip to main content

rocketmq_common/common/message/
message_body.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::Bytes;
16
17/// Represents the message body, handling both raw and compressed forms.
18#[derive(Clone, Debug, Default)]
19pub struct MessageBody {
20    /// The uncompressed message body data.
21    raw: Option<Bytes>,
22    /// The compressed message body data (if compression is enabled).
23    compressed: Option<Bytes>,
24}
25
26impl MessageBody {
27    /// Creates a new message body from bytes.
28    #[inline]
29    pub fn new(data: impl Into<Bytes>) -> Self {
30        Self {
31            raw: Some(data.into()),
32            compressed: None,
33        }
34    }
35
36    /// Creates an empty message body.
37    #[inline]
38    pub const fn empty() -> Self {
39        Self {
40            raw: None,
41            compressed: None,
42        }
43    }
44
45    /// Creates a message body with both raw and compressed data.
46    #[inline]
47    pub fn from_compressed(raw: Bytes, compressed: Bytes) -> Self {
48        Self {
49            raw: Some(raw),
50            compressed: Some(compressed),
51        }
52    }
53
54    /// Returns the raw (uncompressed) body as slice.
55    #[inline]
56    pub fn as_slice(&self) -> &[u8] {
57        self.raw.as_ref().map(|b| &b[..]).unwrap_or(&[])
58    }
59
60    /// Returns the raw body as Option<&Bytes>.
61    #[inline]
62    pub fn raw(&self) -> Option<&Bytes> {
63        self.raw.as_ref()
64    }
65
66    /// Returns a mutable reference to the raw body.
67    #[inline]
68    pub fn raw_mut(&mut self) -> &mut Option<Bytes> {
69        &mut self.raw
70    }
71
72    /// Returns the compressed body, if available.
73    #[inline]
74    pub fn compressed(&self) -> Option<&Bytes> {
75        self.compressed.as_ref()
76    }
77
78    /// Returns a mutable reference to the compressed body.
79    #[inline]
80    pub fn compressed_mut(&mut self) -> &mut Option<Bytes> {
81        &mut self.compressed
82    }
83
84    /// Sets the compressed body.
85    #[inline]
86    pub(crate) fn set_compressed(&mut self, compressed: Bytes) {
87        self.compressed = Some(compressed);
88    }
89
90    /// Returns whether the body is compressed.
91    #[inline]
92    pub fn is_compressed(&self) -> bool {
93        self.compressed.is_some()
94    }
95
96    /// Consumes and returns the raw bytes.
97    #[inline]
98    pub fn into_bytes(self) -> Bytes {
99        self.raw.unwrap_or_default()
100    }
101
102    /// Returns the size in bytes (raw body).
103    #[inline]
104    pub fn len(&self) -> usize {
105        self.raw.as_ref().map_or(0, |b| b.len())
106    }
107
108    /// Returns true if the body is empty (both raw and compressed are None or empty).
109    #[inline]
110    pub fn is_empty(&self) -> bool {
111        let raw_empty = self.raw.as_ref().is_none_or(|b| b.is_empty());
112        let compressed_empty = self.compressed.as_ref().is_none_or(|b| b.is_empty());
113        raw_empty && compressed_empty
114    }
115}
116
117impl AsRef<[u8]> for MessageBody {
118    fn as_ref(&self) -> &[u8] {
119        self.as_slice()
120    }
121}
122
123impl From<Vec<u8>> for MessageBody {
124    fn from(data: Vec<u8>) -> Self {
125        Self::new(data)
126    }
127}
128
129impl From<Bytes> for MessageBody {
130    fn from(data: Bytes) -> Self {
131        Self::new(data)
132    }
133}
134
135impl From<&[u8]> for MessageBody {
136    fn from(data: &[u8]) -> Self {
137        Self::new(Bytes::copy_from_slice(data))
138    }
139}