rocketmq_common/common/message/
message_decoder.rs1use std::collections::HashMap;
18use std::io::Cursor;
19use std::io::Write;
20use std::net::IpAddr;
21use std::net::Ipv4Addr;
22use std::net::Ipv6Addr;
23use std::net::SocketAddr;
24use std::net::SocketAddrV4;
25use std::net::SocketAddrV6;
26use std::str;
27
28use byteorder::BigEndian;
29use byteorder::ByteOrder;
30use bytes::Buf;
31use bytes::BufMut;
32use bytes::Bytes;
33use bytes::BytesMut;
34use cheetah_string::CheetahString;
35
36use crate::common::compression::compression_type::CompressionType;
37use crate::common::compression::compressor_factory::CompressorFactory;
38use crate::common::message::message_client_ext::MessageClientExt;
39use crate::common::message::message_ext::MessageExt;
40use crate::common::message::message_id::MessageId;
41use crate::common::message::message_single::Message;
42use crate::common::message::MessageConst;
43use crate::common::message::MessageTrait;
44use crate::common::message::MessageVersion;
45use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
46use crate::utils::util_all;
47use crate::CRC32Utils::crc32;
48use crate::MessageAccessor::MessageAccessor;
49use crate::MessageUtils::build_message_id;
50
51pub const CHARSET_UTF8: &str = "UTF-8";
52pub const MESSAGE_MAGIC_CODE_POSITION: usize = 4;
53pub const MESSAGE_FLAG_POSITION: usize = 16;
54pub const MESSAGE_PHYSIC_OFFSET_POSITION: usize = 28;
55pub const MESSAGE_STORE_TIMESTAMP_POSITION: usize = 56;
56pub const MESSAGE_MAGIC_CODE: i32 = -626843481;
57pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;
58pub const BLANK_MAGIC_CODE: i32 = -875286124;
59pub const NAME_VALUE_SEPARATOR: char = '\u{0001}';
60pub const PROPERTY_SEPARATOR: char = '\u{0002}';
61pub const PHY_POS_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8;
62pub const QUEUE_OFFSET_POSITION: usize = 4 + 4 + 4 + 4 + 4;
63pub const SYSFLAG_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8 + 8;
64pub const BORN_TIMESTAMP_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8;
65
66pub fn string_to_message_properties(
67 properties: Option<&CheetahString>,
68) -> HashMap<CheetahString, CheetahString> {
69 let mut map = HashMap::new();
70 if let Some(properties) = properties {
71 let mut index = 0;
72 let len = properties.len();
73 while index < len {
74 let new_index = properties[index..]
75 .find(PROPERTY_SEPARATOR)
76 .map_or(len, |i| index + i);
77 if new_index - index >= 3 {
78 if let Some(kv_sep_index) = properties[index..new_index].find(NAME_VALUE_SEPARATOR)
79 {
80 let kv_sep_index = index + kv_sep_index;
81 if kv_sep_index > index && kv_sep_index < new_index - 1 {
82 let k = &properties[index..kv_sep_index];
83 let v = &properties[kv_sep_index + 1..new_index];
84 map.insert(CheetahString::from_slice(k), CheetahString::from_slice(v));
85 }
86 }
87 }
88 index = new_index + 1;
89 }
90 }
91 map
92}
93
94pub fn str_to_message_properties(
95 properties: Option<&str>,
96) -> HashMap<CheetahString, CheetahString> {
97 let mut map = HashMap::new();
98 if let Some(properties) = properties {
99 let mut index = 0;
100 let len = properties.len();
101 while index < len {
102 let new_index = properties[index..]
103 .find(PROPERTY_SEPARATOR)
104 .map_or(len, |i| index + i);
105 if new_index - index >= 3 {
106 if let Some(kv_sep_index) = properties[index..new_index].find(NAME_VALUE_SEPARATOR)
107 {
108 let kv_sep_index = index + kv_sep_index;
109 if kv_sep_index > index && kv_sep_index < new_index - 1 {
110 let k = &properties[index..kv_sep_index];
111 let v = &properties[kv_sep_index + 1..new_index];
112 map.insert(CheetahString::from_slice(k), CheetahString::from_slice(v));
113 }
114 }
115 }
116 index = new_index + 1;
117 }
118 }
119 map
120}
121
122pub fn message_properties_to_string(
123 properties: &HashMap<CheetahString, CheetahString>,
124) -> CheetahString {
125 let mut len = 0;
126 for (name, value) in properties.iter() {
127 len += name.len();
128
129 len += value.len();
130 len += 2; }
132
133 let mut sb = String::with_capacity(len);
134 for (name, value) in properties.iter() {
135 sb.push_str(name);
136 sb.push(NAME_VALUE_SEPARATOR);
137
138 sb.push_str(value);
139 sb.push(PROPERTY_SEPARATOR);
140 }
141 CheetahString::from_string(sb)
142}
143
144pub fn decode_client(
145 byte_buffer: &mut Bytes,
146 read_body: bool,
147 de_compress_body: bool,
148 is_set_properties_string: bool,
149 check_crc: bool,
150) -> Option<MessageClientExt> {
151 decode(
166 byte_buffer,
167 read_body,
168 de_compress_body,
169 false,
170 is_set_properties_string,
171 check_crc,
172 )
173 .map(|msg_ext| MessageClientExt {
174 message_ext_inner: msg_ext,
175 })
176}
177
178pub fn decode(
180 byte_buffer: &mut Bytes,
181 read_body: bool,
182 de_compress_body: bool,
183 is_client: bool,
184 is_set_properties_string: bool,
185 check_crc: bool,
186) -> Option<MessageExt> {
187 let mut msg_ext = if is_client {
188 unimplemented!()
189 } else {
190 MessageExt::default()
191 };
192
193 let store_size = byte_buffer.get_i32();
195 msg_ext.set_store_size(store_size);
196
197 let magic_code = byte_buffer.get_i32();
199 let version = MessageVersion::value_of_magic_code(magic_code).unwrap();
200
201 let body_crc = byte_buffer.get_u32();
203 msg_ext.set_body_crc(body_crc);
204
205 let queue_id = byte_buffer.get_i32();
207 msg_ext.set_queue_id(queue_id);
208
209 let flag = byte_buffer.get_i32();
211 msg_ext.message.flag = flag;
212
213 let queue_offset = byte_buffer.get_i64();
215 msg_ext.set_queue_offset(queue_offset);
216
217 let physic_offset = byte_buffer.get_i64();
219 msg_ext.set_commit_log_offset(physic_offset);
220
221 let sys_flag = byte_buffer.get_i32();
223 msg_ext.set_sys_flag(sys_flag);
224
225 let born_time_stamp = byte_buffer.get_i64();
227 msg_ext.set_born_timestamp(born_time_stamp);
228
229 let (born_host_address, born_host_ip_length) =
231 if sys_flag & MessageSysFlag::BORNHOST_V6_FLAG != 0 {
232 let mut born_host = [0; 16];
233 byte_buffer.copy_to_slice(&mut born_host);
234 let port = byte_buffer.get_i32();
235 (
236 SocketAddr::V6(SocketAddrV6::new(
237 Ipv6Addr::from(born_host),
238 port as u16,
239 0,
240 0,
241 )),
242 16,
243 )
244 } else {
245 let mut born_host = [0; 4];
246 byte_buffer.copy_to_slice(&mut born_host);
247 let port = byte_buffer.get_i32();
248 (
249 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(born_host), port as u16)),
250 4,
251 )
252 };
253 msg_ext.set_born_host(born_host_address);
254
255 let store_timestamp = byte_buffer.get_i64();
257 msg_ext.set_store_timestamp(store_timestamp);
258
259 let (store_host_address, store_host_ip_length) =
261 if sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG != 0 {
262 let mut store_host = [0; 16];
263 byte_buffer.copy_to_slice(&mut store_host);
264 let port = byte_buffer.get_i32();
265 (
266 SocketAddr::V6(SocketAddrV6::new(
267 Ipv6Addr::from(store_host),
268 port as u16,
269 0,
270 0,
271 )),
272 16,
273 )
274 } else {
275 let mut store_host = [0; 4];
276 byte_buffer.copy_to_slice(&mut store_host);
277 let port = byte_buffer.get_i32();
278 (
279 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(store_host), port as u16)),
280 4,
281 )
282 };
283 msg_ext.set_store_host(store_host_address);
284
285 let reconsume_times = byte_buffer.get_i32();
287 msg_ext.set_reconsume_times(reconsume_times);
288
289 let prepared_transaction_offset = byte_buffer.get_i64();
291 msg_ext.set_prepared_transaction_offset(prepared_transaction_offset);
292
293 let body_len = byte_buffer.get_i32();
295 if body_len > 0 {
296 if read_body {
298 let mut body = vec![0; body_len as usize];
299 byte_buffer.copy_to_slice(&mut body);
300 if check_crc {
301 let crc = crc32(&body);
302 if crc != body_crc {
303 return None;
304 }
305 }
306 let mut body_bytes = Bytes::from(body);
307 if de_compress_body
308 && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG
309 {
310 let compression_type = CompressionType::find_by_value(
311 (flag & MessageSysFlag::COMPRESSION_TYPE_COMPARATOR) >> 8,
312 );
313 body_bytes = compression_type.decompression(&body_bytes)
314 }
315 msg_ext.message.body = Some(body_bytes);
316 } else {
317 let _ = byte_buffer.split_to(
318 BORN_TIMESTAMP_POSITION
319 + born_host_ip_length
320 + 4
321 + 8
322 + store_host_ip_length
323 + 4
324 + 4
325 + 8
326 + 4
327 + body_len as usize,
328 );
329 }
330 }
331
332 let topic_len = version.get_topic_length(byte_buffer);
334 let mut topic = vec![0; topic_len];
335 byte_buffer.copy_to_slice(&mut topic);
336 let topic_str = str::from_utf8(&topic).unwrap();
337 msg_ext.message.topic = CheetahString::from_slice(topic_str);
338
339 let properties_length = byte_buffer.get_i16();
341 if properties_length > 0 {
342 let mut properties = vec![0; properties_length as usize];
344 byte_buffer.copy_to_slice(&mut properties);
345 if !is_set_properties_string {
346 let properties_string = CheetahString::from_string(
348 String::from_utf8_lossy(properties.as_slice()).to_string(),
349 );
350 let message_properties = string_to_message_properties(Some(&properties_string));
351 msg_ext.message.properties = message_properties;
352 } else {
353 let properties_string = CheetahString::from_string(
354 String::from_utf8_lossy(properties.as_slice()).to_string(),
355 );
356 let mut message_properties = string_to_message_properties(Some(&properties_string));
357 message_properties.insert(
358 CheetahString::from_static_str("propertiesString"),
359 properties_string,
360 );
361 msg_ext.message.properties = message_properties;
362 }
363 }
364 let msg_id = build_message_id(store_host_address, physic_offset);
365 msg_ext.set_msg_id(CheetahString::from_string(msg_id));
366
367 if is_client {
368 unimplemented!()
369 }
370
371 Some(msg_ext)
372}
373
374pub fn count_inner_msg_num(bytes: Option<Bytes>) -> u32 {
375 match bytes {
376 None => 0,
377 Some(mut bytes) => {
378 let mut count = 0;
379 while bytes.has_remaining() {
380 let size = bytes.slice(0..4).get_i32();
381 if size as usize > bytes.len() {
382 break;
383 }
384 let _ = bytes.split_to(size as usize);
385 count += 1;
386 }
387 count
388 }
389 }
390}
391
392pub fn encode_messages(messages: &[Message]) -> Bytes {
393 let mut bytes = BytesMut::new();
394 let mut all_size = 0;
395 for message in messages {
396 let message_bytes = encode_message(message);
397 all_size += message_bytes.len();
398 bytes.put_slice(&message_bytes);
399 }
400 bytes.freeze()
401}
402
403pub fn encode_message(message: &Message) -> Bytes {
404 let body = message.body.as_ref().unwrap();
405 let body_len = body.len();
406 let properties = message_properties_to_string(&message.properties);
407 let properties_bytes = properties.as_bytes();
408 let properties_length = properties_bytes.len();
409
410 let store_size = 4 + 4 + 4 + 4 + 4 + body_len + 2 + properties_length;
416
417 let mut bytes = BytesMut::with_capacity(store_size);
418
419 bytes.put_i32(store_size as i32);
421
422 bytes.put_i32(0);
424
425 bytes.put_u32(0);
427
428 bytes.put_i32(message.flag);
430
431 bytes.put_i32(body_len as i32);
433 bytes.put_slice(body);
434
435 bytes.put_i16(properties_length as i16);
437 bytes.put_slice(properties_bytes);
438
439 bytes.freeze()
440}
441
442pub fn decodes_batch(
443 byte_buffer: &mut Bytes,
444 read_body: bool,
445 decompress_body: bool,
446) -> Vec<MessageExt> {
447 let mut messages = Vec::new();
448 while byte_buffer.has_remaining() {
449 if let Some(msg_ext) = decode(byte_buffer, read_body, decompress_body, false, false, false)
450 {
451 messages.push(msg_ext);
452 } else {
453 break;
454 }
455 }
456 messages
457}
458
459pub fn decodes_batch_client(
460 byte_buffer: &mut Bytes,
461 read_body: bool,
462 decompress_body: bool,
463) -> Vec<MessageClientExt> {
464 let mut messages = Vec::new();
465 while byte_buffer.has_remaining() {
466 if let Some(msg_ext) = decode_client(byte_buffer, read_body, decompress_body, false, false)
467 {
468 messages.push(msg_ext);
469 } else {
470 break;
471 }
472 }
473 messages
474}
475
476pub fn decode_messages_from(mut message_ext: MessageExt, vec_: &mut Vec<MessageExt>) {
477 let messages = decode_messages(message_ext.message.body.as_mut().unwrap());
478 for message in messages {
479 let mut message_ext_inner = MessageExt {
480 message,
481 ..MessageExt::default()
482 };
483 message_ext_inner.set_topic(message_ext.get_topic().to_owned());
484 message_ext_inner.queue_offset = message_ext.queue_offset;
485 message_ext_inner.queue_id = message_ext.queue_id;
486 message_ext_inner.set_flag(message_ext.get_flag());
487 message_ext_inner.store_host = message_ext.store_host;
491 message_ext_inner.born_host = message_ext.born_host;
492 message_ext_inner.store_timestamp = message_ext.store_timestamp;
493 message_ext_inner.born_timestamp = message_ext.born_timestamp;
494 message_ext_inner.sys_flag = message_ext.sys_flag;
495 message_ext_inner.commit_log_offset = message_ext.commit_log_offset;
496 message_ext_inner.set_wait_store_msg_ok(message_ext.is_wait_store_msg_ok());
497 vec_.push(message_ext_inner);
498 }
499}
500
501pub fn decode_messages(buffer: &mut Bytes) -> Vec<Message> {
502 let mut messages = Vec::new();
503 while buffer.has_remaining() {
504 let message = decode_message(buffer);
505 messages.push(message);
506 }
507 messages
508}
509
510pub fn decode_message(buffer: &mut Bytes) -> Message {
511 let _ = buffer.get_i32();
513
514 let _ = buffer.get_i32();
516
517 let _ = buffer.get_i32();
519
520 let flag = buffer.get_i32();
522
523 let body_len = buffer.get_i32();
525 let body = buffer.split_to(body_len as usize);
526
527 let properties_length = buffer.get_i16();
529 let properties = buffer.split_to(properties_length as usize);
530 let message_properties = str_to_message_properties(Some(str::from_utf8(&properties).unwrap()));
533
534 Message {
535 body: Some(body),
536 properties: message_properties,
537 flag,
538 ..Message::default()
539 }
540}
541
542pub fn decode_message_id(msg_id: &str) -> MessageId {
543 let bytes = util_all::string_to_bytes(msg_id).unwrap();
544 let mut buffer = Bytes::from(bytes);
545 let len = if msg_id.len() == 32 {
546 let mut ip = [0u8; 4];
547 buffer.copy_to_slice(&mut ip);
548 let port = buffer.get_i32();
549 SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
550 } else {
551 let mut ip = [0u8; 16];
552 buffer.copy_to_slice(&mut ip);
553 let port = buffer.get_i32();
554 SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
555 };
556 MessageId {
557 address: len,
558 offset: buffer.get_i64(),
559 }
560}
561
562pub fn encode(
563 message_ext: &MessageExt,
564 need_compress: bool,
565) -> rocketmq_error::RocketMQResult<Bytes> {
566 let body = message_ext.get_body().unwrap();
567 let topic = message_ext.get_topic().as_bytes();
568 let topic_len = topic.len();
569 let properties = message_properties_to_string(message_ext.get_properties());
570 let properties_bytes = properties.as_bytes();
571 let properties_length = properties_bytes.len();
572 let sys_flag = message_ext.sys_flag;
573 let born_host_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
574 8
575 } else {
576 20
577 };
578 let store_host_address_length = if (sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG) == 0 {
579 8
580 } else {
581 20
582 };
583 let new_body = if need_compress
584 && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG
585 {
586 let compressor =
587 CompressorFactory::get_compressor(MessageSysFlag::get_compression_type(sys_flag));
588 let compressed_body = compressor.compress(body, 5)?;
589 Some(compressed_body)
590 } else {
591 None
592 };
593 let body_len = new_body.as_ref().map_or(body.len(), |b| b.len());
594 let store_size = message_ext.store_size;
595 let mut byte_buffer = if store_size > 0 {
596 BytesMut::with_capacity(store_size as usize)
597 } else {
598 let store_size = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + born_host_length + 8 + store_host_address_length + 4 + 8 + 4 + body_len + 1 + topic_len + 2 + properties_length; BytesMut::with_capacity(store_size)
616 };
617
618 byte_buffer.put_i32(store_size);
620
621 byte_buffer.put_i32(MESSAGE_MAGIC_CODE);
623
624 byte_buffer.put_u32(message_ext.body_crc);
626
627 byte_buffer.put_i32(message_ext.queue_id);
629
630 byte_buffer.put_i32(message_ext.message.flag);
632
633 byte_buffer.put_i64(message_ext.queue_offset);
635
636 byte_buffer.put_i64(message_ext.commit_log_offset);
638
639 byte_buffer.put_i32(message_ext.sys_flag);
641
642 byte_buffer.put_i64(message_ext.born_timestamp);
644
645 let born_host = message_ext.born_host;
648 match born_host {
649 SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
650 SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
651 };
652
653 byte_buffer.put_i32(born_host.port() as i32);
654
655 byte_buffer.put_i64(message_ext.store_timestamp);
657
658 let store_host = message_ext.store_host;
661 match store_host {
662 SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
663 SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
664 };
665
666 byte_buffer.put_i32(store_host.port() as i32);
667
668 byte_buffer.put_i32(message_ext.reconsume_times);
670
671 byte_buffer.put_i64(message_ext.prepared_transaction_offset);
673
674 byte_buffer.put_i32(body_len as i32);
676 if let Some(new_body) = new_body {
677 byte_buffer.put_slice(&new_body);
678 } else {
679 byte_buffer.put_slice(body);
680 }
681
682 byte_buffer.put_u8(topic_len as u8);
684 byte_buffer.put_slice(topic);
685
686 byte_buffer.put_i16(properties_length as i16);
688 byte_buffer.put_slice(properties_bytes);
689
690 Ok(byte_buffer.freeze())
691}
692
693pub fn encode_uniquely(
694 message_ext: &MessageExt,
695 need_compress: bool,
696) -> rocketmq_error::RocketMQResult<Bytes> {
697 let body = message_ext.get_body().unwrap();
698 let topics = message_ext.get_topic().as_bytes();
699 let topic_len = topics.len();
700 let properties = message_properties_to_string(message_ext.get_properties());
701 let properties_bytes = properties.as_bytes();
702 let properties_length = properties_bytes.len();
703 let sys_flag = message_ext.sys_flag;
704 let born_host_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
705 8
706 } else {
707 20
708 };
709 let new_body = if need_compress
710 && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG
711 {
712 let compressor =
713 CompressorFactory::get_compressor(MessageSysFlag::get_compression_type(sys_flag));
714 let compressed_body = compressor.compress(body, 5)?;
715 Some(compressed_body)
716 } else {
717 None
718 };
719 let body_len = new_body.as_ref().map_or(body.len(), |b| b.len());
720 let store_size = message_ext.store_size;
721 let mut byte_buffer = if store_size > 0 {
722 BytesMut::with_capacity((store_size - 8) as usize)
723 } else {
724 let store_size = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + born_host_length + 4 + 8 + 4 + body_len + 1 + topic_len + 2 + properties_length; BytesMut::with_capacity(store_size)
740 };
741
742 byte_buffer.put_i32(store_size);
744
745 byte_buffer.put_i32(MESSAGE_MAGIC_CODE);
747
748 byte_buffer.put_u32(message_ext.body_crc);
750
751 byte_buffer.put_i32(message_ext.queue_id);
753
754 byte_buffer.put_i32(message_ext.message.flag);
756
757 byte_buffer.put_i64(message_ext.queue_offset);
759
760 byte_buffer.put_i64(message_ext.commit_log_offset);
762
763 byte_buffer.put_i32(message_ext.sys_flag);
765
766 byte_buffer.put_i64(message_ext.born_timestamp);
768
769 let born_host = message_ext.born_host;
772 match born_host {
773 SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
774 SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
775 };
776 byte_buffer.put_i32(born_host.port() as i32);
777
778 byte_buffer.put_i32(message_ext.reconsume_times);
780
781 byte_buffer.put_i64(message_ext.prepared_transaction_offset);
783
784 byte_buffer.put_i32(body_len as i32);
786 if let Some(new_body) = new_body {
787 byte_buffer.put_slice(&new_body);
788 } else {
789 byte_buffer.put_slice(body);
790 }
791
792 byte_buffer.put_i16(topic_len as i16);
794 byte_buffer.put_slice(topics);
795
796 byte_buffer.put_i16(properties_length as i16);
798 byte_buffer.put_slice(properties_bytes);
799
800 Ok(byte_buffer.freeze())
801}
802
803pub fn create_crc32(mut input: &mut [u8], crc32: u32) {
804 input.put(MessageConst::PROPERTY_CRC32.as_bytes());
805 input.put_u8(NAME_VALUE_SEPARATOR as u8);
806 let mut crc32 = crc32;
807 for _ in 0..10 {
808 let mut b = b'0';
809 if crc32 > 0 {
810 b += (crc32 % 10) as u8;
811 crc32 /= 10;
812 }
813 input.put_u8(b);
814 }
815 input.put_u8(PROPERTY_SEPARATOR as u8);
816}
817
818pub fn decode_properties(bytes: &mut Bytes) -> Option<HashMap<CheetahString, CheetahString>> {
819 if bytes.len() < SYSFLAG_POSITION + 4 {
821 return None;
822 }
823
824 let sys_flag = BigEndian::read_i32(&bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4]);
826 let magic_code =
827 BigEndian::read_i32(&bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4]);
828 let version = match MessageVersion::value_of_magic_code(magic_code) {
829 Ok(value) => value,
830 Err(_) => return None,
831 };
832
833 let bornhost_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
835 8
836 } else {
837 20
838 };
839 let storehost_address_length = if (sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG) == 0 {
840 8
841 } else {
842 20
843 };
844
845 let body_size_position = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhost_length + 8 + storehost_address_length + 4 + 8; if bytes.len() < body_size_position + 4 {
862 return None;
863 }
864
865 let body_size =
867 BigEndian::read_i32(&bytes[body_size_position..body_size_position + 4]) as usize;
868
869 let topic_length_position = body_size_position + 4 + body_size;
871 if bytes.len() < topic_length_position {
872 return None;
873 }
874
875 let slice = &bytes[topic_length_position..];
877 let cursor = Cursor::new(slice);
878 let topic_length_size = version.get_topic_length_size();
879 bytes.advance(topic_length_position);
880 let topic_length = version.get_topic_length(bytes);
881
882 let properties_position = topic_length_position + topic_length_size + topic_length;
884 if bytes.len() < properties_position + 2 {
885 return None;
886 }
887
888 let properties_length =
890 BigEndian::read_i16(&bytes[properties_position..properties_position + 2]);
891
892 let properties_start = properties_position + 2;
894 if properties_length > 0 {
895 let end = properties_start + (properties_length as usize);
896 if bytes.len() < end {
897 return None;
898 }
899 let properties_bytes = &bytes[properties_start..end];
900 if let Ok(properties_string) = String::from_utf8(properties_bytes.to_vec()) {
901 Some(string_to_message_properties(Some(
902 &CheetahString::from_string(properties_string),
903 )))
904 } else {
905 None
906 }
907 } else {
908 None
909 }
910}
911
912#[cfg(test)]
913mod tests {
914 use bytes::BufMut;
915 use bytes::BytesMut;
916
917 use super::*;
918
919 #[test]
920 fn count_inner_msg_num_counts_correctly_for_multiple_messages() {
921 let mut bytes = BytesMut::new();
922 bytes.put_i32(8);
923 bytes.put_slice(&[0, 0, 0, 0]);
924 bytes.put_i32(8);
925 bytes.put_slice(&[0, 0, 0, 0]);
926 assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 2);
927 }
928
929 #[test]
930 fn count_inner_msg_num_counts_correctly_for_single_message() {
931 let mut bytes = BytesMut::new();
932 bytes.put_i32(8);
933 bytes.put_slice(&[0, 0, 0, 0]);
934 assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 1);
935 }
936
937 #[test]
938 fn count_inner_msg_num_counts_zero_for_no_messages() {
939 let bytes = BytesMut::new();
940 assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 0);
941 }
942
943 #[test]
944 fn count_inner_msg_num_ignores_incomplete_messages() {
945 let mut bytes = BytesMut::new();
946 bytes.put_i32(4);
947 assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 1);
948 }
949
950 #[test]
951 fn decode_message_id_ipv4() {
952 let msg_id = "7F0000010007D8260BF075769D36C348";
953 let message_id = decode_message_id(msg_id);
954 assert_eq!(message_id.address, "127.0.0.1:55334".parse().unwrap());
955 assert_eq!(message_id.offset, 860316681131967304);
956 }
957
958 #[test]
959 fn encode_with_compression() {
960 let mut message_ext = MessageExt::default();
961 message_ext.set_body(Bytes::from("Hello, World!"));
962 let result = encode(&message_ext, true);
963 assert!(result.is_ok());
964 let bytes = result.unwrap();
965 assert!(!bytes.is_empty());
966 }
967
968 #[test]
969 fn encode_without_compression() {
970 let mut message_ext = MessageExt::default();
971 message_ext.set_body(Bytes::from("Hello, World!"));
972 let result = encode(&message_ext, false);
973 assert!(result.is_ok());
974 let bytes = result.unwrap();
975 assert!(!bytes.is_empty());
976 }
977
978 #[test]
979 fn encode_with_empty_body() {
980 let mut message_ext = MessageExt::default();
981 message_ext.set_body(Bytes::new());
982 let result = encode(&message_ext, false);
983 assert!(result.is_ok());
984 let bytes = result.unwrap();
985 assert!(!bytes.is_empty());
986 }
987
988 #[test]
989 fn encode_with_large_body() {
990 let mut message_ext = MessageExt::default();
991 let large_body = vec![0u8; 1024 * 1024];
992 message_ext.set_body(Bytes::from(large_body));
993 let result = encode(&message_ext, false);
994 assert!(result.is_ok());
995 let bytes = result.unwrap();
996 assert!(!bytes.is_empty());
997 }
998
999 #[test]
1000 fn encode_uniquely_with_compression() {
1001 let mut message_ext = MessageExt::default();
1002 message_ext.set_body(Bytes::from("Hello, World!"));
1003 let result = encode_uniquely(&message_ext, true);
1004 assert!(result.is_ok());
1005 let bytes = result.unwrap();
1006 assert!(!bytes.is_empty());
1007 }
1008
1009 #[test]
1010 fn encode_uniquely_without_compression() {
1011 let mut message_ext = MessageExt::default();
1012 message_ext.set_body(Bytes::from("Hello, World!"));
1013 let result = encode_uniquely(&message_ext, false);
1014 assert!(result.is_ok());
1015 let bytes = result.unwrap();
1016 assert!(!bytes.is_empty());
1017 }
1018
1019 #[test]
1020 fn encode_uniquely_with_empty_body() {
1021 let mut message_ext = MessageExt::default();
1022 message_ext.set_body(Bytes::new());
1023 let result = encode_uniquely(&message_ext, false);
1024 assert!(result.is_ok());
1025 let bytes = result.unwrap();
1026 assert!(!bytes.is_empty());
1027 }
1028
1029 #[test]
1030 fn encode_uniquely_with_large_body() {
1031 let mut message_ext = MessageExt::default();
1032 let large_body = vec![0u8; 1024 * 1024];
1033 message_ext.set_body(Bytes::from(large_body));
1034 let result = encode_uniquely(&message_ext, false);
1035 assert!(result.is_ok());
1036 let bytes = result.unwrap();
1037 assert!(!bytes.is_empty());
1038 }
1039
1040 #[test]
1041 fn decode_properties_returns_none_if_bytes_length_is_insufficient() {
1042 let mut bytes = Bytes::from(vec![0; SYSFLAG_POSITION + 3]);
1043 assert!(decode_properties(&mut bytes).is_none());
1044 }
1045
1046 #[test]
1047 fn decode_properties_returns_none_if_magic_code_is_invalid() {
1048 let mut bytes =
1049 BytesMut::from_iter(vec![
1050 0u8;
1051 SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4
1052 ]);
1053 BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1054 BigEndian::write_i32(
1055 &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1056 -1,
1057 );
1058 assert!(decode_properties(&mut bytes.freeze()).is_none());
1059 }
1060
1061 #[test]
1062 fn decode_properties_returns_none_if_body_size_is_insufficient() {
1063 let mut bytes = BytesMut::from_iter(vec![
1064 0u8;
1065 SYSFLAG_POSITION
1066 + 4
1067 + MESSAGE_MAGIC_CODE_POSITION
1068 + 4
1069 + 4
1070 ]);
1071 BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1072 BigEndian::write_i32(
1073 &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1074 MESSAGE_MAGIC_CODE,
1075 );
1076 assert!(decode_properties(&mut bytes.freeze()).is_none());
1077 }
1078
1079 #[test]
1080 fn decode_properties_returns_none_if_topic_length_is_insufficient() {
1081 let mut bytes = BytesMut::from_iter(vec![
1082 0u8;
1083 SYSFLAG_POSITION
1084 + 4
1085 + MESSAGE_MAGIC_CODE_POSITION
1086 + 4
1087 + 4
1088 + 4
1089 ]);
1090 BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1091 BigEndian::write_i32(
1092 &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1093 MESSAGE_MAGIC_CODE,
1094 );
1095 BigEndian::write_i32(
1096 &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4
1097 ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4],
1098 0,
1099 );
1100 assert!(decode_properties(&mut bytes.freeze()).is_none());
1101 }
1102
1103 #[test]
1104 fn decode_properties_returns_none_if_properties_length_is_insufficient() {
1105 let mut bytes = BytesMut::from_iter(vec![
1106 0u8;
1107 SYSFLAG_POSITION
1108 + 4
1109 + MESSAGE_MAGIC_CODE_POSITION
1110 + 4
1111 + 4
1112 + 4
1113 + 2
1114 ]);
1115 BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1116 BigEndian::write_i32(
1117 &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1118 MESSAGE_MAGIC_CODE,
1119 );
1120 BigEndian::write_i32(
1121 &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4
1122 ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4],
1123 0,
1124 );
1125 BigEndian::write_i16(
1126 &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4
1127 ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4 + 2],
1128 1,
1129 );
1130 assert!(decode_properties(&mut bytes.freeze()).is_none());
1131 }
1132}