Skip to main content

rocketmq_remoting/codec/
remoting_command_codec.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::BufMut;
16use bytes::Bytes;
17use bytes::BytesMut;
18use rocketmq_error::RocketmqError;
19use tokio_util::codec::BytesCodec;
20use tokio_util::codec::Decoder;
21use tokio_util::codec::Encoder;
22
23use crate::protocol::remoting_command::RemotingCommand;
24
25/// Encodes a `RemotingCommand` into a `BytesMut` buffer.
26///
27/// This method takes a `RemotingCommand` and a mutable reference to a `BytesMut` buffer as
28/// parameters. It first encodes the header of the `RemotingCommand` and calculates the lengths of
29/// the header and body. It then reserves the necessary space in the `BytesMut` buffer and writes
30/// the total length, serialize type, header, and body to the buffer.
31///
32/// # Arguments
33///
34/// * `item` - A `RemotingCommand` that is to be encoded.
35/// * `dst` - A mutable reference to a `BytesMut` buffer where the encoded command will be written.
36///
37/// # Returns
38///
39/// * `Result<(), Self::Error>` - Returns `Ok(())` if the encoding is successful, otherwise returns
40///   an `Err` with a `RemotingError`.
41///
42/// # Errors
43///
44/// This function will return an error if the encoding process fails.
45#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
46pub struct RemotingCommandCodec(());
47
48impl Default for RemotingCommandCodec {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl RemotingCommandCodec {
55    pub fn new() -> Self {
56        RemotingCommandCodec(())
57    }
58}
59
60impl Decoder for RemotingCommandCodec {
61    type Error = rocketmq_error::RocketMQError;
62    type Item = RemotingCommand;
63
64    /// Decodes a `RemotingCommand` from a `BytesMut` buffer.
65    ///
66    /// This method takes a mutable reference to a `BytesMut` buffer as a parameter.
67    /// It first checks if there are at least 4 bytes in the buffer, if not, it returns `Ok(None)`.
68    /// Then it reads the total size of the incoming data as a big-endian i32 from the first 4
69    /// bytes. If the available data is less than the total size, it returns `Ok(None)`.
70    /// It then splits the `BytesMut` buffer to get the command data including the total size and
71    /// discards the first i32 (total size). It reads the header length as a big-endian i32 and
72    /// checks if the header length is greater than the total size minus 4. If it is, it returns
73    /// an error. It then splits the buffer again to get the header data and deserializes it
74    /// into a `RemotingCommand`. If the total size minus 4 is greater than the header length,
75    /// it sets the body of the `RemotingCommand`.
76    ///
77    /// # Arguments
78    ///
79    /// * `src` - A mutable reference to a `BytesMut` buffer from which the `RemotingCommand` will
80    ///   be decoded.
81    ///
82    /// # Returns
83    ///
84    /// * `Result<Option<Self::Item>, Self::Error>` - Returns `Ok(Some(cmd))` if the decoding is
85    ///   successful, otherwise returns an `Err` with a `RemotingError`.
86    ///
87    /// # Errors
88    ///
89    /// This function will return an error if the decoding process fails.
90    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, rocketmq_error::RocketMQError> {
91        RemotingCommand::decode(src)
92    }
93}
94
95impl Encoder<RemotingCommand> for RemotingCommandCodec {
96    type Error = rocketmq_error::RocketMQError;
97
98    /// Encodes a `RemotingCommand` into a `BytesMut` buffer.
99    ///
100    /// This method takes a `RemotingCommand` and a mutable reference to a `BytesMut` buffer as
101    /// parameters. It first encodes the header of the `RemotingCommand` and calculates the
102    /// lengths of the header and body. It then reserves the necessary space in the `BytesMut`
103    /// buffer and writes the total length, serialize type, header, and body to the buffer.
104    ///
105    /// # Arguments
106    ///
107    /// * `item` - A `RemotingCommand` that is to be encoded.
108    /// * `dst` - A mutable reference to a `BytesMut` buffer where the encoded command will be
109    ///   written.
110    ///
111    /// # Returns
112    ///
113    /// * `Result<(), Self::Error>` - Returns `Ok(())` if the encoding is successful, otherwise
114    ///   returns an `Err` with a `RemotingError`.
115    ///
116    /// # Errors
117    ///
118    /// This function will return an error if the encoding process fails.
119    fn encode(&mut self, item: RemotingCommand, dst: &mut BytesMut) -> Result<(), Self::Error> {
120        let mut item = item;
121        item.fast_header_encode(dst);
122        if let Some(body_inner) = item.take_body() {
123            dst.put(body_inner);
124        }
125        Ok(())
126    }
127}
128
129#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
130pub struct CompositeCodec {
131    bytes_codec: BytesCodec,
132    remoting_command_codec: RemotingCommandCodec,
133}
134
135impl CompositeCodec {
136    pub fn new() -> Self {
137        Self {
138            bytes_codec: BytesCodec::new(),
139            remoting_command_codec: RemotingCommandCodec::new(),
140        }
141    }
142}
143
144impl Decoder for CompositeCodec {
145    type Error = rocketmq_error::RocketMQError;
146    type Item = RemotingCommand;
147
148    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, rocketmq_error::RocketMQError> {
149        self.remoting_command_codec.decode(src)
150    }
151}
152
153impl Encoder<Bytes> for CompositeCodec {
154    type Error = rocketmq_error::RocketMQError;
155
156    fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
157        self.bytes_codec.encode(item, dst).map_err(|error| {
158            RocketmqError::RemotingCommandEncoderError(format!("Error encoding bytes: {error}")).into()
159        })
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use bytes::Bytes;
166
167    use super::*;
168    use crate::protocol::header::client_request_header::GetRouteInfoRequestHeader;
169    use crate::protocol::LanguageCode;
170
171    #[tokio::test]
172    async fn decode_handles_insufficient_data() {
173        let mut decoder = RemotingCommandCodec::new();
174        let mut src = BytesMut::from(&[0, 0, 0, 1][..]);
175        assert!(matches!(decoder.decode(&mut src), Ok(None)));
176    }
177
178    #[tokio::test]
179    async fn decode_handles_invalid_total_size() {
180        let mut decoder = RemotingCommandCodec::new();
181        // total_size = 1, which is less than minimum required (4 bytes for serialize_type)
182        let mut src = BytesMut::from(&[0, 0, 0, 1, 0, 0, 0, 0][..]);
183        assert!(decoder.decode(&mut src).is_err());
184    }
185
186    #[tokio::test]
187    async fn encode_handles_empty_body() {
188        let mut encoder = RemotingCommandCodec::new();
189        let mut dst = BytesMut::new();
190        let command = RemotingCommand::create_remoting_command(1)
191            .set_code(1)
192            .set_language(LanguageCode::JAVA)
193            .set_opaque(1)
194            .set_flag(1)
195            .set_command_custom_header(GetRouteInfoRequestHeader::new("1111", Some(true)))
196            .set_remark_option(Some("remark".to_string()));
197        assert!(encoder.encode(command, &mut dst).is_ok());
198    }
199
200    #[tokio::test]
201    async fn encode_handles_non_empty_body() {
202        let mut encoder = RemotingCommandCodec::new();
203        let mut dst = BytesMut::new();
204        let command = RemotingCommand::create_remoting_command(1)
205            .set_code(1)
206            .set_language(LanguageCode::JAVA)
207            .set_opaque(1)
208            .set_flag(1)
209            .set_body(Bytes::from("body"))
210            .set_command_custom_header(GetRouteInfoRequestHeader::new("1111", Some(true)))
211            .set_remark_option(Some("remark".to_string()));
212        assert!(encoder.encode(command, &mut dst).is_ok());
213    }
214}