rocketmq_remoting/codec/remoting_command_codec.rs
1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::BufMut;
16use bytes::Bytes;
17use bytes::BytesMut;
18use rocketmq_error::RocketmqError;
19use tokio_util::codec::BytesCodec;
20use tokio_util::codec::Decoder;
21use tokio_util::codec::Encoder;
22
23use crate::protocol::remoting_command::RemotingCommand;
24
25/// Encodes a `RemotingCommand` into a `BytesMut` buffer.
26///
27/// This method takes a `RemotingCommand` and a mutable reference to a `BytesMut` buffer as
28/// parameters. It first encodes the header of the `RemotingCommand` and calculates the lengths of
29/// the header and body. It then reserves the necessary space in the `BytesMut` buffer and writes
30/// the total length, serialize type, header, and body to the buffer.
31///
32/// # Arguments
33///
34/// * `item` - A `RemotingCommand` that is to be encoded.
35/// * `dst` - A mutable reference to a `BytesMut` buffer where the encoded command will be written.
36///
37/// # Returns
38///
39/// * `Result<(), Self::Error>` - Returns `Ok(())` if the encoding is successful, otherwise returns
40/// an `Err` with a `RemotingError`.
41///
42/// # Errors
43///
44/// This function will return an error if the encoding process fails.
45#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
46pub struct RemotingCommandCodec(());
47
48impl Default for RemotingCommandCodec {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl RemotingCommandCodec {
55 pub fn new() -> Self {
56 RemotingCommandCodec(())
57 }
58}
59
60impl Decoder for RemotingCommandCodec {
61 type Error = rocketmq_error::RocketMQError;
62 type Item = RemotingCommand;
63
64 /// Decodes a `RemotingCommand` from a `BytesMut` buffer.
65 ///
66 /// This method takes a mutable reference to a `BytesMut` buffer as a parameter.
67 /// It first checks if there are at least 4 bytes in the buffer, if not, it returns `Ok(None)`.
68 /// Then it reads the total size of the incoming data as a big-endian i32 from the first 4
69 /// bytes. If the available data is less than the total size, it returns `Ok(None)`.
70 /// It then splits the `BytesMut` buffer to get the command data including the total size and
71 /// discards the first i32 (total size). It reads the header length as a big-endian i32 and
72 /// checks if the header length is greater than the total size minus 4. If it is, it returns
73 /// an error. It then splits the buffer again to get the header data and deserializes it
74 /// into a `RemotingCommand`. If the total size minus 4 is greater than the header length,
75 /// it sets the body of the `RemotingCommand`.
76 ///
77 /// # Arguments
78 ///
79 /// * `src` - A mutable reference to a `BytesMut` buffer from which the `RemotingCommand` will
80 /// be decoded.
81 ///
82 /// # Returns
83 ///
84 /// * `Result<Option<Self::Item>, Self::Error>` - Returns `Ok(Some(cmd))` if the decoding is
85 /// successful, otherwise returns an `Err` with a `RemotingError`.
86 ///
87 /// # Errors
88 ///
89 /// This function will return an error if the decoding process fails.
90 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, rocketmq_error::RocketMQError> {
91 RemotingCommand::decode(src)
92 }
93}
94
95impl Encoder<RemotingCommand> for RemotingCommandCodec {
96 type Error = rocketmq_error::RocketMQError;
97
98 /// Encodes a `RemotingCommand` into a `BytesMut` buffer.
99 ///
100 /// This method takes a `RemotingCommand` and a mutable reference to a `BytesMut` buffer as
101 /// parameters. It first encodes the header of the `RemotingCommand` and calculates the
102 /// lengths of the header and body. It then reserves the necessary space in the `BytesMut`
103 /// buffer and writes the total length, serialize type, header, and body to the buffer.
104 ///
105 /// # Arguments
106 ///
107 /// * `item` - A `RemotingCommand` that is to be encoded.
108 /// * `dst` - A mutable reference to a `BytesMut` buffer where the encoded command will be
109 /// written.
110 ///
111 /// # Returns
112 ///
113 /// * `Result<(), Self::Error>` - Returns `Ok(())` if the encoding is successful, otherwise
114 /// returns an `Err` with a `RemotingError`.
115 ///
116 /// # Errors
117 ///
118 /// This function will return an error if the encoding process fails.
119 fn encode(&mut self, item: RemotingCommand, dst: &mut BytesMut) -> Result<(), Self::Error> {
120 let mut item = item;
121 item.fast_header_encode(dst);
122 if let Some(body_inner) = item.take_body() {
123 dst.put(body_inner);
124 }
125 Ok(())
126 }
127}
128
129#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
130pub struct CompositeCodec {
131 bytes_codec: BytesCodec,
132 remoting_command_codec: RemotingCommandCodec,
133}
134
135impl CompositeCodec {
136 pub fn new() -> Self {
137 Self {
138 bytes_codec: BytesCodec::new(),
139 remoting_command_codec: RemotingCommandCodec::new(),
140 }
141 }
142}
143
144impl Decoder for CompositeCodec {
145 type Error = rocketmq_error::RocketMQError;
146 type Item = RemotingCommand;
147
148 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, rocketmq_error::RocketMQError> {
149 self.remoting_command_codec.decode(src)
150 }
151}
152
153impl Encoder<Bytes> for CompositeCodec {
154 type Error = rocketmq_error::RocketMQError;
155
156 fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
157 self.bytes_codec.encode(item, dst).map_err(|error| {
158 RocketmqError::RemotingCommandEncoderError(format!("Error encoding bytes: {error}")).into()
159 })
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use bytes::Bytes;
166
167 use super::*;
168 use crate::protocol::header::client_request_header::GetRouteInfoRequestHeader;
169 use crate::protocol::LanguageCode;
170
171 #[tokio::test]
172 async fn decode_handles_insufficient_data() {
173 let mut decoder = RemotingCommandCodec::new();
174 let mut src = BytesMut::from(&[0, 0, 0, 1][..]);
175 assert!(matches!(decoder.decode(&mut src), Ok(None)));
176 }
177
178 #[tokio::test]
179 async fn decode_handles_invalid_total_size() {
180 let mut decoder = RemotingCommandCodec::new();
181 // total_size = 1, which is less than minimum required (4 bytes for serialize_type)
182 let mut src = BytesMut::from(&[0, 0, 0, 1, 0, 0, 0, 0][..]);
183 assert!(decoder.decode(&mut src).is_err());
184 }
185
186 #[tokio::test]
187 async fn encode_handles_empty_body() {
188 let mut encoder = RemotingCommandCodec::new();
189 let mut dst = BytesMut::new();
190 let command = RemotingCommand::create_remoting_command(1)
191 .set_code(1)
192 .set_language(LanguageCode::JAVA)
193 .set_opaque(1)
194 .set_flag(1)
195 .set_command_custom_header(GetRouteInfoRequestHeader::new("1111", Some(true)))
196 .set_remark_option(Some("remark".to_string()));
197 assert!(encoder.encode(command, &mut dst).is_ok());
198 }
199
200 #[tokio::test]
201 async fn encode_handles_non_empty_body() {
202 let mut encoder = RemotingCommandCodec::new();
203 let mut dst = BytesMut::new();
204 let command = RemotingCommand::create_remoting_command(1)
205 .set_code(1)
206 .set_language(LanguageCode::JAVA)
207 .set_opaque(1)
208 .set_flag(1)
209 .set_body(Bytes::from("body"))
210 .set_command_custom_header(GetRouteInfoRequestHeader::new("1111", Some(true)))
211 .set_remark_option(Some("remark".to_string()));
212 assert!(encoder.encode(command, &mut dst).is_ok());
213 }
214}