rocketmq_remoting/codec/
remoting_command_codec.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use bytes::BufMut;
19use bytes::Bytes;
20use bytes::BytesMut;
21use rocketmq_error::RocketmqError;
22use tokio_util::codec::BytesCodec;
23use tokio_util::codec::Decoder;
24use tokio_util::codec::Encoder;
25
26use crate::protocol::remoting_command::RemotingCommand;
27
28/// Encodes a `RemotingCommand` into a `BytesMut` buffer.
29///
30/// This method takes a `RemotingCommand` and a mutable reference to a `BytesMut` buffer as
31/// parameters. It first encodes the header of the `RemotingCommand` and calculates the lengths of
32/// the header and body. It then reserves the necessary space in the `BytesMut` buffer and writes
33/// the total length, serialize type, header, and body to the buffer.
34///
35/// # Arguments
36///
37/// * `item` - A `RemotingCommand` that is to be encoded.
38/// * `dst` - A mutable reference to a `BytesMut` buffer where the encoded command will be written.
39///
40/// # Returns
41///
42/// * `Result<(), Self::Error>` - Returns `Ok(())` if the encoding is successful, otherwise returns
43///   an `Err` with a `RemotingError`.
44///
45/// # Errors
46///
47/// This function will return an error if the encoding process fails.
48#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
49pub struct RemotingCommandCodec(());
50
51impl Default for RemotingCommandCodec {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57impl RemotingCommandCodec {
58    pub fn new() -> Self {
59        RemotingCommandCodec(())
60    }
61}
62
63impl Decoder for RemotingCommandCodec {
64    type Error = rocketmq_error::RocketMQError;
65    type Item = RemotingCommand;
66
67    /// Decodes a `RemotingCommand` from a `BytesMut` buffer.
68    ///
69    /// This method takes a mutable reference to a `BytesMut` buffer as a parameter.
70    /// It first checks if there are at least 4 bytes in the buffer, if not, it returns `Ok(None)`.
71    /// Then it reads the total size of the incoming data as a big-endian i32 from the first 4
72    /// bytes. If the available data is less than the total size, it returns `Ok(None)`.
73    /// It then splits the `BytesMut` buffer to get the command data including the total size and
74    /// discards the first i32 (total size). It reads the header length as a big-endian i32 and
75    /// checks if the header length is greater than the total size minus 4. If it is, it returns
76    /// an error. It then splits the buffer again to get the header data and deserializes it
77    /// into a `RemotingCommand`. If the total size minus 4 is greater than the header length,
78    /// it sets the body of the `RemotingCommand`.
79    ///
80    /// # Arguments
81    ///
82    /// * `src` - A mutable reference to a `BytesMut` buffer from which the `RemotingCommand` will
83    ///   be decoded.
84    ///
85    /// # Returns
86    ///
87    /// * `Result<Option<Self::Item>, Self::Error>` - Returns `Ok(Some(cmd))` if the decoding is
88    ///   successful, otherwise returns an `Err` with a `RemotingError`.
89    ///
90    /// # Errors
91    ///
92    /// This function will return an error if the decoding process fails.
93    fn decode(
94        &mut self,
95        src: &mut BytesMut,
96    ) -> Result<Option<Self::Item>, rocketmq_error::RocketMQError> {
97        RemotingCommand::decode(src)
98    }
99}
100
101impl Encoder<RemotingCommand> for RemotingCommandCodec {
102    type Error = rocketmq_error::RocketMQError;
103
104    /// Encodes a `RemotingCommand` into a `BytesMut` buffer.
105    ///
106    /// This method takes a `RemotingCommand` and a mutable reference to a `BytesMut` buffer as
107    /// parameters. It first encodes the header of the `RemotingCommand` and calculates the
108    /// lengths of the header and body. It then reserves the necessary space in the `BytesMut`
109    /// buffer and writes the total length, serialize type, header, and body to the buffer.
110    ///
111    /// # Arguments
112    ///
113    /// * `item` - A `RemotingCommand` that is to be encoded.
114    /// * `dst` - A mutable reference to a `BytesMut` buffer where the encoded command will be
115    ///   written.
116    ///
117    /// # Returns
118    ///
119    /// * `Result<(), Self::Error>` - Returns `Ok(())` if the encoding is successful, otherwise
120    ///   returns an `Err` with a `RemotingError`.
121    ///
122    /// # Errors
123    ///
124    /// This function will return an error if the encoding process fails.
125    fn encode(&mut self, item: RemotingCommand, dst: &mut BytesMut) -> Result<(), Self::Error> {
126        let mut item = item;
127        item.fast_header_encode(dst);
128        if let Some(body_inner) = item.take_body() {
129            dst.put(body_inner);
130        }
131        Ok(())
132    }
133}
134
135#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
136pub struct CompositeCodec {
137    bytes_codec: BytesCodec,
138    remoting_command_codec: RemotingCommandCodec,
139}
140
141impl CompositeCodec {
142    pub fn new() -> Self {
143        Self {
144            bytes_codec: BytesCodec::new(),
145            remoting_command_codec: RemotingCommandCodec::new(),
146        }
147    }
148}
149
150impl Decoder for CompositeCodec {
151    type Error = rocketmq_error::RocketMQError;
152    type Item = RemotingCommand;
153
154    fn decode(
155        &mut self,
156        src: &mut BytesMut,
157    ) -> Result<Option<Self::Item>, rocketmq_error::RocketMQError> {
158        self.remoting_command_codec.decode(src)
159    }
160}
161
162impl Encoder<Bytes> for CompositeCodec {
163    type Error = rocketmq_error::RocketMQError;
164
165    fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
166        self.bytes_codec.encode(item, dst).map_err(|error| {
167            RocketmqError::RemotingCommandEncoderError(format!("Error encoding bytes: {error}"))
168                .into()
169        })
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use bytes::Bytes;
176
177    use super::*;
178    use crate::protocol::header::client_request_header::GetRouteInfoRequestHeader;
179    use crate::protocol::LanguageCode;
180
181    #[tokio::test]
182    async fn decode_handles_insufficient_data() {
183        let mut decoder = RemotingCommandCodec::new();
184        let mut src = BytesMut::from(&[0, 0, 0, 1][..]);
185        assert!(matches!(decoder.decode(&mut src), Ok(None)));
186    }
187
188    #[tokio::test]
189    async fn decode_handles_invalid_total_size() {
190        let mut decoder = RemotingCommandCodec::new();
191        // total_size = 1, which is less than minimum required (4 bytes for serialize_type)
192        let mut src = BytesMut::from(&[0, 0, 0, 1, 0, 0, 0, 0][..]);
193        assert!(decoder.decode(&mut src).is_err());
194    }
195
196    #[tokio::test]
197    async fn encode_handles_empty_body() {
198        let mut encoder = RemotingCommandCodec::new();
199        let mut dst = BytesMut::new();
200        let command = RemotingCommand::create_remoting_command(1)
201            .set_code(1)
202            .set_language(LanguageCode::JAVA)
203            .set_opaque(1)
204            .set_flag(1)
205            .set_command_custom_header(GetRouteInfoRequestHeader::new("1111", Some(true)))
206            .set_remark_option(Some("remark".to_string()));
207        assert!(encoder.encode(command, &mut dst).is_ok());
208    }
209
210    #[tokio::test]
211    async fn encode_handles_non_empty_body() {
212        let mut encoder = RemotingCommandCodec::new();
213        let mut dst = BytesMut::new();
214        let command = RemotingCommand::create_remoting_command(1)
215            .set_code(1)
216            .set_language(LanguageCode::JAVA)
217            .set_opaque(1)
218            .set_flag(1)
219            .set_body(Bytes::from("body"))
220            .set_command_custom_header(GetRouteInfoRequestHeader::new("1111", Some(true)))
221            .set_remark_option(Some("remark".to_string()));
222        assert!(encoder.encode(command, &mut dst).is_ok());
223    }
224}