rocketmq_remoting/protocol/
rocketmq_serializable.rs1use std::collections::HashMap;
19use std::str;
20
21use bytes::Buf;
22use bytes::BufMut;
23use bytes::Bytes;
24use bytes::BytesMut;
25use cheetah_string::CheetahString;
26use rocketmq_error::RocketmqError;
27
28use crate::protocol::remoting_command::RemotingCommand;
29use crate::protocol::LanguageCode;
30
31pub struct RocketMQSerializable;
32
33impl RocketMQSerializable {
34 #[inline]
36 pub fn write_str(buf: &mut BytesMut, use_short_length: bool, s: &str) -> usize {
37 let bytes = s.as_bytes();
38 let len = bytes.len();
39
40 let length_size = if use_short_length {
41 buf.put_u16(len as u16);
42 2
43 } else {
44 buf.put_u32(len as u32);
45 4
46 };
47
48 buf.put_slice(bytes); length_size + len
50 }
51
52 #[inline]
54 pub fn read_str(
55 buf: &mut BytesMut,
56 use_short_length: bool,
57 limit: usize,
58 ) -> rocketmq_error::RocketMQResult<Option<CheetahString>> {
59 let len = if use_short_length {
61 if buf.remaining() < 2 {
62 return Err(RocketmqError::DecodingError(2, buf.remaining()).into());
63 }
64 buf.get_u16() as usize
65 } else {
66 if buf.remaining() < 4 {
67 return Err(RocketmqError::DecodingError(4, buf.remaining()).into());
68 }
69 buf.get_u32() as usize
70 };
71
72 if len == 0 {
74 return Ok(None);
75 }
76
77 if len > limit {
79 return Err(RocketmqError::DecodingError(len, limit).into());
80 }
81
82 if buf.remaining() < len {
84 return Err(RocketmqError::DecodingError(len, buf.remaining()).into());
85 }
86
87 let bytes = buf.split_to(len).freeze();
89 Ok(Some(CheetahString::from_bytes(bytes)))
90 }
91
92 #[inline]
94 pub fn rocketmq_protocol_encode(cmd: &mut RemotingCommand, buf: &mut BytesMut) -> usize {
95 let begin_index = buf.len();
96
97 let estimated_size = Self::estimate_encode_size(cmd);
99 buf.reserve(estimated_size);
100
101 buf.put_u16(cmd.code() as u16); buf.put_u8(cmd.language().get_code()); buf.put_u16(cmd.version() as u16); buf.put_i32(cmd.opaque()); buf.put_i32(cmd.flag()); if let Some(remark) = cmd.remark() {
110 Self::write_str(buf, false, remark.as_str());
111 } else {
112 buf.put_i32(0);
113 }
114
115 let map_len_index = buf.len();
117 buf.put_i32(0);
118
119 if let Some(header) = cmd.command_custom_header_mut() {
121 if header.support_fast_codec() {
122 header.encode_fast(buf);
123 }
124 }
125
126 if let Some(ext_fields) = cmd.ext_fields() {
128 for (k, v) in ext_fields.iter() {
129 if !k.is_empty() && !v.is_empty() {
131 Self::write_str(buf, true, k.as_str());
132 Self::write_str(buf, true, v.as_str());
133 }
134 }
135 }
136
137 let current_length = buf.len();
139 let ext_fields_length = (current_length - map_len_index - 4) as i32;
140 buf[map_len_index..map_len_index + 4].copy_from_slice(&ext_fields_length.to_be_bytes());
141
142 buf.len() - begin_index
143 }
144
145 #[inline]
147 fn estimate_encode_size(cmd: &RemotingCommand) -> usize {
148 let mut size = 15; if let Some(remark) = cmd.remark() {
152 size += 4 + remark.len(); } else {
154 size += 4; }
156
157 if let Some(ext) = cmd.ext_fields() {
159 for (k, v) in ext.iter() {
160 if !k.is_empty() && !v.is_empty() {
161 size += 2 + k.len() + 2 + v.len(); }
163 }
164 }
165
166 size
167 }
168
169 pub fn rocket_mq_protocol_encode_bytes(cmd: &RemotingCommand) -> Bytes {
170 let remark_bytes = cmd.remark().map(|remark| remark.as_bytes().to_vec());
171 let remark_len = remark_bytes.as_ref().map_or(0, |v| v.len());
172
173 let ext_fields_bytes = if let Some(ext) = cmd.get_ext_fields() {
174 Self::map_serialize(ext)
175 } else {
176 None
177 };
178 let ext_len = ext_fields_bytes.as_ref().map_or(0, |v| v.len());
179
180 let total_len = Self::cal_total_len(remark_len, ext_len);
181 let mut header_buffer = BytesMut::with_capacity(total_len);
182
183 header_buffer.put_i16(cmd.code() as i16);
185
186 header_buffer.put_u8(cmd.language().get_code());
188
189 header_buffer.put_i16(cmd.version() as i16);
191
192 header_buffer.put_i32(cmd.opaque());
194
195 header_buffer.put_i32(cmd.flag());
197
198 if let Some(remark_bytes) = remark_bytes {
200 header_buffer.put_i32(remark_bytes.len() as i32);
201 header_buffer.put(remark_bytes.as_ref());
202 } else {
203 header_buffer.put_i32(0);
204 }
205
206 if let Some(ext_fields_bytes) = ext_fields_bytes {
208 header_buffer.put_i32(ext_fields_bytes.len() as i32);
209 header_buffer.put(ext_fields_bytes.as_ref());
210 } else {
211 header_buffer.put_i32(0);
212 }
213
214 header_buffer.freeze()
215 }
216
217 #[inline]
219 pub fn map_serialize(map: &HashMap<CheetahString, CheetahString>) -> Option<BytesMut> {
220 if map.is_empty() {
221 return None;
222 }
223
224 let mut total_length = 0;
226 let mut valid_entries = 0;
227
228 for (key, value) in map.iter() {
229 if !key.is_empty() && !value.is_empty() {
230 total_length += 2 + key.len() + 4 + value.len();
231 valid_entries += 1;
232 }
233 }
234
235 if valid_entries == 0 {
236 return None;
237 }
238
239 let mut content = BytesMut::with_capacity(total_length);
241
242 for (key, value) in map.iter() {
244 if !key.is_empty() && !value.is_empty() {
245 content.put_u16(key.len() as u16);
247 content.put_slice(key.as_bytes());
248
249 content.put_i32(value.len() as i32);
251 content.put_slice(value.as_bytes());
252 }
253 }
254
255 Some(content)
256 }
257
258 pub fn cal_total_len(remark_len: usize, ext_len: usize) -> usize {
259 2 + 1 + 2 + 4 + 4 + 4 + remark_len + 4 + ext_len }
275
276 pub fn rocket_mq_protocol_decode(
277 header_buffer: &mut BytesMut,
278 header_len: usize,
279 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
280 let cmd = RemotingCommand::default()
281 .set_code(header_buffer.get_i16())
282 .set_language(LanguageCode::value_of(header_buffer.get_u8()).unwrap())
283 .set_version(header_buffer.get_i16() as i32)
284 .set_opaque(header_buffer.get_i32())
285 .set_flag(header_buffer.get_i32());
286
287 let remark = Self::read_str(header_buffer, false, header_len)?;
288
289 let ext_fields_length = header_buffer.get_i32() as usize;
291 let ext = if ext_fields_length > 0 {
292 if ext_fields_length > header_len {
293 return Err(RocketmqError::DecodingError(ext_fields_length, header_len).into());
294 }
295 Self::map_deserialize(header_buffer, ext_fields_length)?
296 } else {
297 HashMap::new()
298 };
299
300 Ok(cmd.set_remark_option(remark).set_ext_fields(ext))
301 }
302
303 #[inline]
305 pub fn map_deserialize(
306 buffer: &mut BytesMut,
307 len: usize,
308 ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, CheetahString>> {
309 if len == 0 {
310 return Ok(HashMap::new());
311 }
312
313 let estimated_entries = (len / 50).clamp(4, 1024);
315 let mut map = HashMap::with_capacity(estimated_entries);
316
317 let target_remaining = buffer.remaining().saturating_sub(len);
318
319 while buffer.remaining() > target_remaining {
320 let key = Self::read_str(buffer, true, len)?
322 .ok_or_else(|| RocketmqError::DecodingError(0, 0))?;
323
324 let value = Self::read_str(buffer, false, len)?
326 .ok_or_else(|| RocketmqError::DecodingError(0, 0))?;
327
328 map.insert(key, value);
329 }
330
331 Ok(map)
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use bytes::BytesMut;
338
339 use super::*;
340
341 #[test]
342 fn write_str_short_length() {
343 let mut buf = BytesMut::new();
344 let written = RocketMQSerializable::write_str(&mut buf, true, "test");
345 assert_eq!(written, 6);
346 assert_eq!(buf, BytesMut::from(&[0, 4, 116, 101, 115, 116][..]));
347 }
348
349 #[test]
350 fn write_str_long_length() {
351 let mut buf = BytesMut::new();
352 let written = RocketMQSerializable::write_str(&mut buf, false, "test");
353 assert_eq!(written, 8);
354 assert_eq!(buf, BytesMut::from(&[0, 0, 0, 4, 116, 101, 115, 116][..]));
355 }
356
357 #[test]
358 fn read_str_short_length() {
359 let mut buf = BytesMut::from(&[0, 4, 116, 101, 115, 116][..]);
360 let read = RocketMQSerializable::read_str(&mut buf, true, 10).unwrap();
361 assert_eq!(read, Some("test".into()));
362 }
363
364 #[test]
365 fn read_str_long_length() {
366 let mut buf = BytesMut::from(&[0, 0, 0, 4, 116, 101, 115, 116][..]);
367 let read = RocketMQSerializable::read_str(&mut buf, false, 10).unwrap();
368 assert_eq!(read, Some("test".into()));
369 }
370
371 #[test]
372 fn read_str_exceeds_limit() {
373 let mut buf = BytesMut::from(&[0, 0, 0, 4, 116, 101, 115, 116][..]);
374 let read = RocketMQSerializable::read_str(&mut buf, false, 2);
375 assert!(read.is_err());
376 }
377
378 #[test]
379 fn map_serialize_empty() {
380 let map = HashMap::new();
381 let serialized = RocketMQSerializable::map_serialize(&map);
382 assert!(serialized.is_none());
383 }
384
385 #[test]
386 fn map_serialize_non_empty() {
387 let mut map = HashMap::new();
388 map.insert("key".into(), "value".into());
389 let serialized = RocketMQSerializable::map_serialize(&map).unwrap();
390 assert_eq!(
391 serialized,
392 BytesMut::from(&[0, 3, 107, 101, 121, 0, 0, 0, 5, 118, 97, 108, 117, 101][..])
393 );
394 }
395
396 #[test]
397 fn map_deserialize_empty() {
398 let mut buf = BytesMut::new();
399 let deserialized = RocketMQSerializable::map_deserialize(&mut buf, 0).unwrap();
400 assert!(deserialized.is_empty());
401 }
402
403 #[test]
404 fn map_deserialize_non_empty() {
405 let mut buf =
406 BytesMut::from(&[0, 3, 107, 101, 121, 0, 0, 0, 5, 118, 97, 108, 117, 101][..]);
407 let deserialized = RocketMQSerializable::map_deserialize(&mut buf, 14).unwrap();
408 assert_eq!(
409 deserialized,
410 [("key".into(), "value".into())].iter().cloned().collect()
411 );
412 }
413}