rocketmq_remoting/codec/remoting_command_codec.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use bytes::BufMut;
19use bytes::Bytes;
20use bytes::BytesMut;
21use rocketmq_error::RocketmqError;
22use tokio_util::codec::BytesCodec;
23use tokio_util::codec::Decoder;
24use tokio_util::codec::Encoder;
25
26use crate::protocol::remoting_command::RemotingCommand;
27
28/// Encodes a `RemotingCommand` into a `BytesMut` buffer.
29///
30/// This method takes a `RemotingCommand` and a mutable reference to a `BytesMut` buffer as
31/// parameters. It first encodes the header of the `RemotingCommand` and calculates the lengths of
32/// the header and body. It then reserves the necessary space in the `BytesMut` buffer and writes
33/// the total length, serialize type, header, and body to the buffer.
34///
35/// # Arguments
36///
37/// * `item` - A `RemotingCommand` that is to be encoded.
38/// * `dst` - A mutable reference to a `BytesMut` buffer where the encoded command will be written.
39///
40/// # Returns
41///
42/// * `Result<(), Self::Error>` - Returns `Ok(())` if the encoding is successful, otherwise returns
43/// an `Err` with a `RemotingError`.
44///
45/// # Errors
46///
47/// This function will return an error if the encoding process fails.
48#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
49pub struct RemotingCommandCodec(());
50
51impl Default for RemotingCommandCodec {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57impl RemotingCommandCodec {
58 pub fn new() -> Self {
59 RemotingCommandCodec(())
60 }
61}
62
63impl Decoder for RemotingCommandCodec {
64 type Error = rocketmq_error::RocketmqError;
65 type Item = RemotingCommand;
66
67 /// Decodes a `RemotingCommand` from a `BytesMut` buffer.
68 ///
69 /// This method takes a mutable reference to a `BytesMut` buffer as a parameter.
70 /// It first checks if there are at least 4 bytes in the buffer, if not, it returns `Ok(None)`.
71 /// Then it reads the total size of the incoming data as a big-endian i32 from the first 4
72 /// bytes. If the available data is less than the total size, it returns `Ok(None)`.
73 /// It then splits the `BytesMut` buffer to get the command data including the total size and
74 /// discards the first i32 (total size). It reads the header length as a big-endian i32 and
75 /// checks if the header length is greater than the total size minus 4. If it is, it returns
76 /// an error. It then splits the buffer again to get the header data and deserializes it
77 /// into a `RemotingCommand`. If the total size minus 4 is greater than the header length,
78 /// it sets the body of the `RemotingCommand`.
79 ///
80 /// # Arguments
81 ///
82 /// * `src` - A mutable reference to a `BytesMut` buffer from which the `RemotingCommand` will
83 /// be decoded.
84 ///
85 /// # Returns
86 ///
87 /// * `Result<Option<Self::Item>, Self::Error>` - Returns `Ok(Some(cmd))` if the decoding is
88 /// successful, otherwise returns an `Err` with a `RemotingError`.
89 ///
90 /// # Errors
91 ///
92 /// This function will return an error if the decoding process fails.
93 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
94 RemotingCommand::decode(src)
95 }
96}
97
98impl Encoder<RemotingCommand> for RemotingCommandCodec {
99 type Error = rocketmq_error::RocketmqError;
100
101 /// Encodes a `RemotingCommand` into a `BytesMut` buffer.
102 ///
103 /// This method takes a `RemotingCommand` and a mutable reference to a `BytesMut` buffer as
104 /// parameters. It first encodes the header of the `RemotingCommand` and calculates the
105 /// lengths of the header and body. It then reserves the necessary space in the `BytesMut`
106 /// buffer and writes the total length, serialize type, header, and body to the buffer.
107 ///
108 /// # Arguments
109 ///
110 /// * `item` - A `RemotingCommand` that is to be encoded.
111 /// * `dst` - A mutable reference to a `BytesMut` buffer where the encoded command will be
112 /// written.
113 ///
114 /// # Returns
115 ///
116 /// * `Result<(), Self::Error>` - Returns `Ok(())` if the encoding is successful, otherwise
117 /// returns an `Err` with a `RemotingError`.
118 ///
119 /// # Errors
120 ///
121 /// This function will return an error if the encoding process fails.
122 fn encode(&mut self, item: RemotingCommand, dst: &mut BytesMut) -> Result<(), Self::Error> {
123 let mut item = item;
124 item.fast_header_encode(dst);
125 if let Some(body_inner) = item.take_body() {
126 dst.put(body_inner);
127 }
128 Ok(())
129 }
130}
131
132#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
133pub struct CompositeCodec {
134 bytes_codec: BytesCodec,
135 remoting_command_codec: RemotingCommandCodec,
136}
137
138impl CompositeCodec {
139 pub fn new() -> Self {
140 Self {
141 bytes_codec: BytesCodec::new(),
142 remoting_command_codec: RemotingCommandCodec::new(),
143 }
144 }
145}
146
147impl Decoder for CompositeCodec {
148 type Error = rocketmq_error::RocketmqError;
149 type Item = RemotingCommand;
150
151 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
152 self.remoting_command_codec.decode(src)
153 }
154}
155
156impl Encoder<Bytes> for CompositeCodec {
157 type Error = rocketmq_error::RocketmqError;
158
159 fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
160 self.bytes_codec.encode(item, dst).map_err(|error| {
161 RocketmqError::RemotingCommandEncoderError(format!("Error encoding bytes: {error}"))
162 })
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use bytes::Bytes;
169
170 use super::*;
171 use crate::protocol::header::client_request_header::GetRouteInfoRequestHeader;
172 use crate::protocol::LanguageCode;
173
174 #[tokio::test]
175 async fn decode_handles_insufficient_data() {
176 let mut decoder = RemotingCommandCodec::new();
177 let mut src = BytesMut::from(&[0, 0, 0, 1][..]);
178 assert!(matches!(decoder.decode(&mut src), Ok(None)));
179 }
180
181 #[tokio::test]
182 async fn decode_handles_sufficient_data() {
183 let mut decoder = RemotingCommandCodec::new();
184 let mut src = BytesMut::from(&[0, 0, 0, 1, 0, 0, 0, 0][..]);
185 assert!(matches!(decoder.decode(&mut src), Ok(None)));
186 }
187
188 #[tokio::test]
189 async fn encode_handles_empty_body() {
190 let mut encoder = RemotingCommandCodec::new();
191 let mut dst = BytesMut::new();
192 let command = RemotingCommand::create_remoting_command(1)
193 .set_code(1)
194 .set_language(LanguageCode::JAVA)
195 .set_opaque(1)
196 .set_flag(1)
197 .set_command_custom_header(GetRouteInfoRequestHeader::new("1111", Some(true)))
198 .set_remark_option(Some("remark".to_string()));
199 assert!(encoder.encode(command, &mut dst).is_ok());
200 }
201
202 #[tokio::test]
203 async fn encode_handles_non_empty_body() {
204 let mut encoder = RemotingCommandCodec::new();
205 let mut dst = BytesMut::new();
206 let command = RemotingCommand::create_remoting_command(1)
207 .set_code(1)
208 .set_language(LanguageCode::JAVA)
209 .set_opaque(1)
210 .set_flag(1)
211 .set_body(Bytes::from("body"))
212 .set_command_custom_header(GetRouteInfoRequestHeader::new("1111", Some(true)))
213 .set_remark_option(Some("remark".to_string()));
214 assert!(encoder.encode(command, &mut dst).is_ok());
215 }
216}