rocketmq_common/common/message/
message_decoder.rs1use std::collections::HashMap;
16use std::io::Cursor;
17use std::io::Write;
18use std::net::IpAddr;
19use std::net::Ipv4Addr;
20use std::net::Ipv6Addr;
21use std::net::SocketAddr;
22use std::net::SocketAddrV4;
23use std::net::SocketAddrV6;
24use std::str;
25
26use byteorder::BigEndian;
27use byteorder::ByteOrder;
28use bytes::Buf;
29use bytes::BufMut;
30use bytes::Bytes;
31use bytes::BytesMut;
32use cheetah_string::CheetahString;
33
34use crate::common::compression::compression_type::CompressionType;
35use crate::common::compression::compressor_factory::CompressorFactory;
36use crate::common::message::message_client_ext::MessageClientExt;
37use crate::common::message::message_ext::MessageExt;
38use crate::common::message::message_id::MessageId;
39use crate::common::message::message_property::MessageProperties;
40use crate::common::message::message_single::Message;
41use crate::common::message::MessageConst;
42use crate::common::message::MessageTrait;
43use crate::common::message::MessageVersion;
44use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
45use crate::utils::util_all;
46use crate::CRC32Utils::crc32;
47use crate::MessageAccessor::MessageAccessor;
48use crate::MessageUtils::build_message_id;
49
50pub const CHARSET_UTF8: &str = "UTF-8";
51pub const MESSAGE_MAGIC_CODE_POSITION: usize = 4;
52pub const MESSAGE_FLAG_POSITION: usize = 16;
53pub const MESSAGE_PHYSIC_OFFSET_POSITION: usize = 28;
54pub const MESSAGE_STORE_TIMESTAMP_POSITION: usize = 56;
55pub const MESSAGE_MAGIC_CODE: i32 = -626843481;
56pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;
57pub const BLANK_MAGIC_CODE: i32 = -875286124;
58pub const NAME_VALUE_SEPARATOR: char = '\u{0001}';
59pub const PROPERTY_SEPARATOR: char = '\u{0002}';
60pub const PHY_POS_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8;
61pub const QUEUE_OFFSET_POSITION: usize = 4 + 4 + 4 + 4 + 4;
62pub const SYSFLAG_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8 + 8;
63pub const BORN_TIMESTAMP_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8;
64
65pub fn string_to_message_properties(properties: Option<&CheetahString>) -> HashMap<CheetahString, CheetahString> {
66 let mut map = HashMap::new();
67 if let Some(properties) = properties {
68 let mut index = 0;
69 let len = properties.len();
70 while index < len {
71 let new_index = properties[index..].find(PROPERTY_SEPARATOR).map_or(len, |i| index + i);
72 if new_index - index >= 3 {
73 if let Some(kv_sep_index) = properties[index..new_index].find(NAME_VALUE_SEPARATOR) {
74 let kv_sep_index = index + kv_sep_index;
75 if kv_sep_index > index && kv_sep_index < new_index - 1 {
76 let k = &properties[index..kv_sep_index];
77 let v = &properties[kv_sep_index + 1..new_index];
78 map.insert(CheetahString::from_slice(k), CheetahString::from_slice(v));
79 }
80 }
81 }
82 index = new_index + 1;
83 }
84 }
85 map
86}
87
88pub fn str_to_message_properties(properties: Option<&str>) -> HashMap<CheetahString, CheetahString> {
89 let mut map = HashMap::new();
90 if let Some(properties) = properties {
91 let mut index = 0;
92 let len = properties.len();
93 while index < len {
94 let new_index = properties[index..].find(PROPERTY_SEPARATOR).map_or(len, |i| index + i);
95 if new_index - index >= 3 {
96 if let Some(kv_sep_index) = properties[index..new_index].find(NAME_VALUE_SEPARATOR) {
97 let kv_sep_index = index + kv_sep_index;
98 if kv_sep_index > index && kv_sep_index < new_index - 1 {
99 let k = &properties[index..kv_sep_index];
100 let v = &properties[kv_sep_index + 1..new_index];
101 map.insert(CheetahString::from_slice(k), CheetahString::from_slice(v));
102 }
103 }
104 }
105 index = new_index + 1;
106 }
107 }
108 map
109}
110
111pub fn message_properties_to_string(properties: &HashMap<CheetahString, CheetahString>) -> CheetahString {
112 let mut len = 0;
113 for (name, value) in properties.iter() {
114 len += name.len();
115
116 len += value.len();
117 len += 2; }
119
120 let mut sb = String::with_capacity(len);
121 for (name, value) in properties.iter() {
122 sb.push_str(name);
123 sb.push(NAME_VALUE_SEPARATOR);
124
125 sb.push_str(value);
126 sb.push(PROPERTY_SEPARATOR);
127 }
128 CheetahString::from_string(sb)
129}
130
131pub fn decode_client(
132 byte_buffer: &mut Bytes,
133 read_body: bool,
134 de_compress_body: bool,
135 is_set_properties_string: bool,
136 check_crc: bool,
137) -> Option<MessageClientExt> {
138 decode(
153 byte_buffer,
154 read_body,
155 de_compress_body,
156 false,
157 is_set_properties_string,
158 check_crc,
159 )
160 .map(|msg_ext| MessageClientExt {
161 message_ext_inner: msg_ext,
162 })
163}
164
165pub fn decode(
167 byte_buffer: &mut Bytes,
168 read_body: bool,
169 de_compress_body: bool,
170 is_client: bool,
171 is_set_properties_string: bool,
172 check_crc: bool,
173) -> Option<MessageExt> {
174 let mut msg_ext = if is_client {
175 unimplemented!()
176 } else {
177 MessageExt::default()
178 };
179
180 let store_size = byte_buffer.get_i32();
182 msg_ext.set_store_size(store_size);
183
184 let magic_code = byte_buffer.get_i32();
186 let version = MessageVersion::value_of_magic_code(magic_code).unwrap();
187
188 let body_crc = byte_buffer.get_u32();
190 msg_ext.set_body_crc(body_crc);
191
192 let queue_id = byte_buffer.get_i32();
194 msg_ext.set_queue_id(queue_id);
195
196 let flag = byte_buffer.get_i32();
198 msg_ext.message.set_flag(flag);
199
200 let queue_offset = byte_buffer.get_i64();
202 msg_ext.set_queue_offset(queue_offset);
203
204 let physic_offset = byte_buffer.get_i64();
206 msg_ext.set_commit_log_offset(physic_offset);
207
208 let sys_flag = byte_buffer.get_i32();
210 msg_ext.set_sys_flag(sys_flag);
211
212 let born_time_stamp = byte_buffer.get_i64();
214 msg_ext.set_born_timestamp(born_time_stamp);
215
216 let (born_host_address, born_host_ip_length) = if sys_flag & MessageSysFlag::BORNHOST_V6_FLAG != 0 {
218 let mut born_host = [0; 16];
219 byte_buffer.copy_to_slice(&mut born_host);
220 let port = byte_buffer.get_i32();
221 (
222 SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(born_host), port as u16, 0, 0)),
223 16,
224 )
225 } else {
226 let mut born_host = [0; 4];
227 byte_buffer.copy_to_slice(&mut born_host);
228 let port = byte_buffer.get_i32();
229 (
230 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(born_host), port as u16)),
231 4,
232 )
233 };
234 msg_ext.set_born_host(born_host_address);
235
236 let store_timestamp = byte_buffer.get_i64();
238 msg_ext.set_store_timestamp(store_timestamp);
239
240 let (store_host_address, store_host_ip_length) = if sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG != 0 {
242 let mut store_host = [0; 16];
243 byte_buffer.copy_to_slice(&mut store_host);
244 let port = byte_buffer.get_i32();
245 (
246 SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(store_host), port as u16, 0, 0)),
247 16,
248 )
249 } else {
250 let mut store_host = [0; 4];
251 byte_buffer.copy_to_slice(&mut store_host);
252 let port = byte_buffer.get_i32();
253 (
254 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(store_host), port as u16)),
255 4,
256 )
257 };
258 msg_ext.set_store_host(store_host_address);
259
260 let reconsume_times = byte_buffer.get_i32();
262 msg_ext.set_reconsume_times(reconsume_times);
263
264 let prepared_transaction_offset = byte_buffer.get_i64();
266 msg_ext.set_prepared_transaction_offset(prepared_transaction_offset);
267
268 let body_len = byte_buffer.get_i32();
270 if body_len > 0 {
271 if read_body {
273 let mut body = vec![0; body_len as usize];
274 byte_buffer.copy_to_slice(&mut body);
275 if check_crc {
276 let crc = crc32(&body);
277 if crc != body_crc {
278 return None;
279 }
280 }
281 let mut body_bytes = Bytes::from(body);
282 if de_compress_body && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG {
283 let compression_type =
284 CompressionType::find_by_value((flag & MessageSysFlag::COMPRESSION_TYPE_COMPARATOR) >> 8);
285 body_bytes = compression_type.decompression(&body_bytes)
286 }
287 msg_ext.message.set_body(Some(body_bytes));
288 } else {
289 let _ = byte_buffer.split_to(
290 BORN_TIMESTAMP_POSITION
291 + born_host_ip_length
292 + 4
293 + 8
294 + store_host_ip_length
295 + 4
296 + 4
297 + 8
298 + 4
299 + body_len as usize,
300 );
301 }
302 }
303
304 let topic_len = version.get_topic_length(byte_buffer);
306 let mut topic = vec![0; topic_len];
307 byte_buffer.copy_to_slice(&mut topic);
308 let topic_str = str::from_utf8(&topic).unwrap();
309 msg_ext.message.set_topic(CheetahString::from_slice(topic_str));
310
311 let properties_length = byte_buffer.get_i16();
313 if properties_length > 0 {
314 let mut properties = vec![0; properties_length as usize];
316 byte_buffer.copy_to_slice(&mut properties);
317 if !is_set_properties_string {
318 let properties_string =
320 CheetahString::from_string(String::from_utf8_lossy(properties.as_slice()).to_string());
321 let message_properties = string_to_message_properties(Some(&properties_string));
322 *msg_ext.message.properties_mut() = MessageProperties::from_map(message_properties);
323 } else {
324 let properties_string =
325 CheetahString::from_string(String::from_utf8_lossy(properties.as_slice()).to_string());
326 let mut message_properties = string_to_message_properties(Some(&properties_string));
327 message_properties.insert(CheetahString::from_static_str("propertiesString"), properties_string);
328 *msg_ext.message.properties_mut() = MessageProperties::from_map(message_properties);
329 }
330 }
331 let msg_id = build_message_id(store_host_address, physic_offset);
332 msg_ext.set_msg_id(CheetahString::from_string(msg_id));
333
334 if is_client {
335 unimplemented!()
336 }
337
338 Some(msg_ext)
339}
340
341pub fn count_inner_msg_num(bytes: Option<Bytes>) -> u32 {
342 match bytes {
343 None => 0,
344 Some(mut bytes) => {
345 let mut count = 0;
346 while bytes.has_remaining() {
347 let size = bytes.slice(0..4).get_i32();
348 if size as usize > bytes.len() {
349 break;
350 }
351 let _ = bytes.split_to(size as usize);
352 count += 1;
353 }
354 count
355 }
356 }
357}
358
359pub fn encode_messages(messages: &[Message]) -> Bytes {
360 let mut bytes = BytesMut::new();
361 for message in messages {
363 let message_bytes = encode_message(message);
364 bytes.put_slice(&message_bytes);
366 }
367 bytes.freeze()
368}
369
370pub fn encode_message(message: &Message) -> Bytes {
371 let body = message.get_body().unwrap();
372 let body_len = body.len();
373 let properties = message_properties_to_string(message.properties().as_map());
374 let properties_bytes = properties.as_bytes();
375 let properties_length = properties_bytes.len();
376
377 let store_size = 4 + 4 + 4 + 4 + 4 + body_len + 2 + properties_length;
383
384 let mut bytes = BytesMut::with_capacity(store_size);
385
386 bytes.put_i32(store_size as i32);
388
389 bytes.put_i32(0);
391
392 bytes.put_u32(0);
394
395 bytes.put_i32(message.flag());
397
398 bytes.put_i32(body_len as i32);
400 bytes.put_slice(body);
401
402 bytes.put_i16(properties_length as i16);
404 bytes.put_slice(properties_bytes);
405
406 bytes.freeze()
407}
408
409pub fn decodes_batch(byte_buffer: &mut Bytes, read_body: bool, decompress_body: bool) -> Vec<MessageExt> {
410 let mut messages = Vec::new();
411 while byte_buffer.has_remaining() {
412 if let Some(msg_ext) = decode(byte_buffer, read_body, decompress_body, false, false, false) {
413 messages.push(msg_ext);
414 } else {
415 break;
416 }
417 }
418 messages
419}
420
421pub fn decodes_batch_client(byte_buffer: &mut Bytes, read_body: bool, decompress_body: bool) -> Vec<MessageClientExt> {
422 let mut messages = Vec::new();
423 while byte_buffer.has_remaining() {
424 if let Some(msg_ext) = decode_client(byte_buffer, read_body, decompress_body, false, false) {
425 messages.push(msg_ext);
426 } else {
427 break;
428 }
429 }
430 messages
431}
432
433pub fn decode_messages_from(mut message_ext: MessageExt, vec_: &mut Vec<MessageExt>) {
434 let messages = decode_messages(message_ext.message.body_mut().raw_mut().as_mut().unwrap());
435 for message in messages {
436 let mut message_ext_inner = MessageExt {
437 message,
438 ..MessageExt::default()
439 };
440 message_ext_inner.set_topic(message_ext.topic().to_owned());
441 message_ext_inner.queue_offset = message_ext.queue_offset;
442 message_ext_inner.queue_id = message_ext.queue_id;
443 message_ext_inner.set_flag(message_ext.get_flag());
444 message_ext_inner.store_host = message_ext.store_host;
448 message_ext_inner.born_host = message_ext.born_host;
449 message_ext_inner.store_timestamp = message_ext.store_timestamp;
450 message_ext_inner.born_timestamp = message_ext.born_timestamp;
451 message_ext_inner.sys_flag = message_ext.sys_flag;
452 message_ext_inner.commit_log_offset = message_ext.commit_log_offset;
453 message_ext_inner.set_wait_store_msg_ok(message_ext.is_wait_store_msg_ok());
454 vec_.push(message_ext_inner);
455 }
456}
457
458pub fn decode_messages(buffer: &mut Bytes) -> Vec<Message> {
459 let mut messages = Vec::new();
460 while buffer.has_remaining() {
461 let message = decode_message(buffer);
462 messages.push(message);
463 }
464 messages
465}
466
467pub fn decode_message(buffer: &mut Bytes) -> Message {
468 let _ = buffer.get_i32();
470
471 let _ = buffer.get_i32();
473
474 let _ = buffer.get_i32();
476
477 let flag = buffer.get_i32();
479
480 let body_len = buffer.get_i32();
482 let body = buffer.split_to(body_len as usize);
483
484 let properties_length = buffer.get_i16();
486 let properties = buffer.split_to(properties_length as usize);
487 let message_properties = str_to_message_properties(Some(str::from_utf8(&properties).unwrap()));
490
491 let mut message = Message::default();
492 message.set_body(Some(body));
493 message.set_properties(message_properties);
494 message.set_flag(flag);
495 message
496}
497
498const MSG_ID_IPV4_LEN: usize = 32;
499const MSG_ID_IPV6_LEN: usize = 56;
500
501pub fn validate_message_id(msg_id: &str) -> Result<(), String> {
502 let msg_id = msg_id.trim();
503
504 if msg_id.is_empty() {
505 return Err("Message ID cannot be empty".to_string());
506 }
507
508 let len = msg_id.len();
509 if len != MSG_ID_IPV4_LEN && len != MSG_ID_IPV6_LEN {
510 return Err(format!(
511 "Invalid message ID length: {len}. Expected {MSG_ID_IPV4_LEN} characters (IPv4) or {MSG_ID_IPV6_LEN} \
512 characters (IPv6)"
513 ));
514 }
515
516 if !msg_id.bytes().all(|b| b.is_ascii_hexdigit()) {
517 return Err("Message ID must be a valid hexadecimal string".to_string());
518 }
519
520 Ok(())
521}
522
523pub fn decode_message_id(msg_id: &str) -> Result<MessageId, String> {
524 validate_message_id(msg_id)?;
525 let bytes = util_all::string_to_bytes(msg_id)
526 .ok_or_else(|| "Failed to decode message ID: invalid hex string".to_string())?;
527 let mut buffer = Bytes::from(bytes);
528 let address = if msg_id.len() == 32 {
529 let mut ip = [0u8; 4];
530 buffer.copy_to_slice(&mut ip);
531 let port = buffer.get_i32();
532 SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
533 } else {
534 let mut ip = [0u8; 16];
535 buffer.copy_to_slice(&mut ip);
536 let port = buffer.get_i32();
537 SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
538 };
539 Ok(MessageId {
540 address,
541 offset: buffer.get_i64(),
542 })
543}
544
545pub fn encode(message_ext: &MessageExt, need_compress: bool) -> rocketmq_error::RocketMQResult<Bytes> {
546 let body = message_ext.get_body().unwrap();
547 let topic = message_ext.topic().as_bytes();
548 let topic_len = topic.len();
549 let properties = message_properties_to_string(message_ext.get_properties());
550 let properties_bytes = properties.as_bytes();
551 let properties_length = properties_bytes.len();
552 let sys_flag = message_ext.sys_flag;
553 let born_host_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
554 8
555 } else {
556 20
557 };
558 let store_host_address_length = if (sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG) == 0 {
559 8
560 } else {
561 20
562 };
563 let new_body = if need_compress && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG {
564 let compressor = CompressorFactory::get_compressor(MessageSysFlag::get_compression_type(sys_flag));
565 let compressed_body = compressor.compress(body, 5)?;
566 Some(compressed_body)
567 } else {
568 None
569 };
570 let body_len = new_body.as_ref().map_or(body.len(), |b| b.len());
571 let store_size = message_ext.store_size;
572 let mut byte_buffer = if store_size > 0 {
573 BytesMut::with_capacity(store_size as usize)
574 } else {
575 let store_size = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + born_host_length + 8 + store_host_address_length + 4 + 8 + 4 + body_len + 1 + topic_len + 2 + properties_length; BytesMut::with_capacity(store_size)
593 };
594
595 byte_buffer.put_i32(store_size);
597
598 byte_buffer.put_i32(MESSAGE_MAGIC_CODE);
600
601 byte_buffer.put_u32(message_ext.body_crc);
603
604 byte_buffer.put_i32(message_ext.queue_id);
606
607 byte_buffer.put_i32(message_ext.message.flag());
609
610 byte_buffer.put_i64(message_ext.queue_offset);
612
613 byte_buffer.put_i64(message_ext.commit_log_offset);
615
616 byte_buffer.put_i32(message_ext.sys_flag);
618
619 byte_buffer.put_i64(message_ext.born_timestamp);
621
622 let born_host = message_ext.born_host;
625 match born_host {
626 SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
627 SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
628 };
629
630 byte_buffer.put_i32(born_host.port() as i32);
631
632 byte_buffer.put_i64(message_ext.store_timestamp);
634
635 let store_host = message_ext.store_host;
638 match store_host {
639 SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
640 SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
641 };
642
643 byte_buffer.put_i32(store_host.port() as i32);
644
645 byte_buffer.put_i32(message_ext.reconsume_times);
647
648 byte_buffer.put_i64(message_ext.prepared_transaction_offset);
650
651 byte_buffer.put_i32(body_len as i32);
653 if let Some(new_body) = new_body {
654 byte_buffer.put_slice(&new_body);
655 } else {
656 byte_buffer.put_slice(body);
657 }
658
659 byte_buffer.put_u8(topic_len as u8);
661 byte_buffer.put_slice(topic);
662
663 byte_buffer.put_i16(properties_length as i16);
665 byte_buffer.put_slice(properties_bytes);
666
667 Ok(byte_buffer.freeze())
668}
669
670pub fn encode_uniquely(message_ext: &MessageExt, need_compress: bool) -> rocketmq_error::RocketMQResult<Bytes> {
671 let body = message_ext.get_body().unwrap();
672 let topics = message_ext.topic().as_bytes();
673 let topic_len = topics.len();
674 let properties = message_properties_to_string(message_ext.get_properties());
675 let properties_bytes = properties.as_bytes();
676 let properties_length = properties_bytes.len();
677 let sys_flag = message_ext.sys_flag;
678 let born_host_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
679 8
680 } else {
681 20
682 };
683 let new_body = if need_compress && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG {
684 let compressor = CompressorFactory::get_compressor(MessageSysFlag::get_compression_type(sys_flag));
685 let compressed_body = compressor.compress(body, 5)?;
686 Some(compressed_body)
687 } else {
688 None
689 };
690 let body_len = new_body.as_ref().map_or(body.len(), |b| b.len());
691 let store_size = message_ext.store_size;
692 let mut byte_buffer = if store_size > 0 {
693 BytesMut::with_capacity((store_size - 8) as usize)
694 } else {
695 let store_size = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + born_host_length + 4 + 8 + 4 + body_len + 1 + topic_len + 2 + properties_length; BytesMut::with_capacity(store_size)
711 };
712
713 byte_buffer.put_i32(store_size);
715
716 byte_buffer.put_i32(MESSAGE_MAGIC_CODE);
718
719 byte_buffer.put_u32(message_ext.body_crc);
721
722 byte_buffer.put_i32(message_ext.queue_id);
724
725 byte_buffer.put_i32(message_ext.message.flag());
727
728 byte_buffer.put_i64(message_ext.queue_offset);
730
731 byte_buffer.put_i64(message_ext.commit_log_offset);
733
734 byte_buffer.put_i32(message_ext.sys_flag);
736
737 byte_buffer.put_i64(message_ext.born_timestamp);
739
740 let born_host = message_ext.born_host;
743 match born_host {
744 SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
745 SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
746 };
747 byte_buffer.put_i32(born_host.port() as i32);
748
749 byte_buffer.put_i32(message_ext.reconsume_times);
751
752 byte_buffer.put_i64(message_ext.prepared_transaction_offset);
754
755 byte_buffer.put_i32(body_len as i32);
757 if let Some(new_body) = new_body {
758 byte_buffer.put_slice(&new_body);
759 } else {
760 byte_buffer.put_slice(body);
761 }
762
763 byte_buffer.put_i16(topic_len as i16);
765 byte_buffer.put_slice(topics);
766
767 byte_buffer.put_i16(properties_length as i16);
769 byte_buffer.put_slice(properties_bytes);
770
771 Ok(byte_buffer.freeze())
772}
773
774pub fn create_crc32(mut input: &mut [u8], crc32: u32) {
775 input.put(MessageConst::PROPERTY_CRC32.as_bytes());
776 input.put_u8(NAME_VALUE_SEPARATOR as u8);
777 let mut crc32 = crc32;
778 for _ in 0..10 {
779 let mut b = b'0';
780 if crc32 > 0 {
781 b += (crc32 % 10) as u8;
782 crc32 /= 10;
783 }
784 input.put_u8(b);
785 }
786 input.put_u8(PROPERTY_SEPARATOR as u8);
787}
788
789pub fn decode_properties(bytes: &mut Bytes) -> Option<HashMap<CheetahString, CheetahString>> {
790 if bytes.len() < SYSFLAG_POSITION + 4 {
792 return None;
793 }
794
795 let sys_flag = BigEndian::read_i32(&bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4]);
797 let magic_code = BigEndian::read_i32(&bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4]);
798 let version = match MessageVersion::value_of_magic_code(magic_code) {
799 Ok(value) => value,
800 Err(_) => return None,
801 };
802
803 let bornhost_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
805 8
806 } else {
807 20
808 };
809 let storehost_address_length = if (sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG) == 0 {
810 8
811 } else {
812 20
813 };
814
815 let body_size_position = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhost_length + 8 + storehost_address_length + 4 + 8; if bytes.len() < body_size_position + 4 {
832 return None;
833 }
834
835 let body_size = BigEndian::read_i32(&bytes[body_size_position..body_size_position + 4]) as usize;
837
838 let topic_length_position = body_size_position + 4 + body_size;
840 if bytes.len() < topic_length_position {
841 return None;
842 }
843
844 let slice = &bytes[topic_length_position..];
846 let cursor = Cursor::new(slice);
847 let topic_length_size = version.get_topic_length_size();
848 bytes.advance(topic_length_position);
849 let topic_length = version.get_topic_length(bytes);
850
851 let properties_position = topic_length_position + topic_length_size + topic_length;
853 if bytes.len() < properties_position + 2 {
854 return None;
855 }
856
857 let properties_length = BigEndian::read_i16(&bytes[properties_position..properties_position + 2]);
859
860 let properties_start = properties_position + 2;
862 if properties_length > 0 {
863 let end = properties_start + (properties_length as usize);
864 if bytes.len() < end {
865 return None;
866 }
867 let properties_bytes = &bytes[properties_start..end];
868 if let Ok(properties_string) = String::from_utf8(properties_bytes.to_vec()) {
869 Some(string_to_message_properties(Some(&CheetahString::from_string(
870 properties_string,
871 ))))
872 } else {
873 None
874 }
875 } else {
876 None
877 }
878}
879
880#[cfg(test)]
881mod tests {
882 use bytes::BufMut;
883 use bytes::BytesMut;
884
885 use super::*;
886
887 #[test]
888 fn count_inner_msg_num_counts_correctly_for_multiple_messages() {
889 let mut bytes = BytesMut::new();
890 bytes.put_i32(8);
891 bytes.put_slice(&[0, 0, 0, 0]);
892 bytes.put_i32(8);
893 bytes.put_slice(&[0, 0, 0, 0]);
894 assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 2);
895 }
896
897 #[test]
898 fn count_inner_msg_num_counts_correctly_for_single_message() {
899 let mut bytes = BytesMut::new();
900 bytes.put_i32(8);
901 bytes.put_slice(&[0, 0, 0, 0]);
902 assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 1);
903 }
904
905 #[test]
906 fn count_inner_msg_num_counts_zero_for_no_messages() {
907 let bytes = BytesMut::new();
908 assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 0);
909 }
910
911 #[test]
912 fn count_inner_msg_num_ignores_incomplete_messages() {
913 let mut bytes = BytesMut::new();
914 bytes.put_i32(4);
915 assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 1);
916 }
917
918 #[test]
919 fn decode_message_id_ipv4() {
920 let msg_id = "7F0000010007D8260BF075769D36C348";
921 let message_id = decode_message_id(msg_id).unwrap();
922 assert_eq!(message_id.address, "127.0.0.1:55334".parse().unwrap());
923 assert_eq!(message_id.offset, 860316681131967304);
924 }
925
926 #[test]
927 fn encode_with_compression() {
928 let mut message_ext = MessageExt::default();
929 message_ext.set_body(Bytes::from("Hello, World!"));
930 let result = encode(&message_ext, true);
931 assert!(result.is_ok());
932 let bytes = result.unwrap();
933 assert!(!bytes.is_empty());
934 }
935
936 #[test]
937 fn encode_without_compression() {
938 let mut message_ext = MessageExt::default();
939 message_ext.set_body(Bytes::from("Hello, World!"));
940 let result = encode(&message_ext, false);
941 assert!(result.is_ok());
942 let bytes = result.unwrap();
943 assert!(!bytes.is_empty());
944 }
945
946 #[test]
947 fn encode_with_empty_body() {
948 let mut message_ext = MessageExt::default();
949 message_ext.set_body(Bytes::new());
950 let result = encode(&message_ext, false);
951 assert!(result.is_ok());
952 let bytes = result.unwrap();
953 assert!(!bytes.is_empty());
954 }
955
956 #[test]
957 fn encode_with_large_body() {
958 let mut message_ext = MessageExt::default();
959 let large_body = vec![0u8; 1024 * 1024];
960 message_ext.set_body(Bytes::from(large_body));
961 let result = encode(&message_ext, false);
962 assert!(result.is_ok());
963 let bytes = result.unwrap();
964 assert!(!bytes.is_empty());
965 }
966
967 #[test]
968 fn encode_uniquely_with_compression() {
969 let mut message_ext = MessageExt::default();
970 message_ext.set_body(Bytes::from("Hello, World!"));
971 let result = encode_uniquely(&message_ext, true);
972 assert!(result.is_ok());
973 let bytes = result.unwrap();
974 assert!(!bytes.is_empty());
975 }
976
977 #[test]
978 fn encode_uniquely_without_compression() {
979 let mut message_ext = MessageExt::default();
980 message_ext.set_body(Bytes::from("Hello, World!"));
981 let result = encode_uniquely(&message_ext, false);
982 assert!(result.is_ok());
983 let bytes = result.unwrap();
984 assert!(!bytes.is_empty());
985 }
986
987 #[test]
988 fn encode_uniquely_with_empty_body() {
989 let mut message_ext = MessageExt::default();
990 message_ext.set_body(Bytes::new());
991 let result = encode_uniquely(&message_ext, false);
992 assert!(result.is_ok());
993 let bytes = result.unwrap();
994 assert!(!bytes.is_empty());
995 }
996
997 #[test]
998 fn encode_uniquely_with_large_body() {
999 let mut message_ext = MessageExt::default();
1000 let large_body = vec![0u8; 1024 * 1024];
1001 message_ext.set_body(Bytes::from(large_body));
1002 let result = encode_uniquely(&message_ext, false);
1003 assert!(result.is_ok());
1004 let bytes = result.unwrap();
1005 assert!(!bytes.is_empty());
1006 }
1007
1008 #[test]
1009 fn decode_properties_returns_none_if_bytes_length_is_insufficient() {
1010 let mut bytes = Bytes::from(vec![0; SYSFLAG_POSITION + 3]);
1011 assert!(decode_properties(&mut bytes).is_none());
1012 }
1013
1014 #[test]
1015 fn decode_properties_returns_none_if_magic_code_is_invalid() {
1016 let mut bytes = BytesMut::from_iter(vec![0u8; SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4]);
1017 BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1018 BigEndian::write_i32(
1019 &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1020 -1,
1021 );
1022 assert!(decode_properties(&mut bytes.freeze()).is_none());
1023 }
1024
1025 #[test]
1026 fn decode_properties_returns_none_if_body_size_is_insufficient() {
1027 let mut bytes = BytesMut::from_iter(vec![0u8; SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4]);
1028 BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1029 BigEndian::write_i32(
1030 &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1031 MESSAGE_MAGIC_CODE,
1032 );
1033 assert!(decode_properties(&mut bytes.freeze()).is_none());
1034 }
1035
1036 #[test]
1037 fn decode_properties_returns_none_if_topic_length_is_insufficient() {
1038 let mut bytes = BytesMut::from_iter(vec![
1039 0u8;
1040 SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4
1041 ]);
1042 BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1043 BigEndian::write_i32(
1044 &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1045 MESSAGE_MAGIC_CODE,
1046 );
1047 BigEndian::write_i32(
1048 &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4
1049 ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4],
1050 0,
1051 );
1052 assert!(decode_properties(&mut bytes.freeze()).is_none());
1053 }
1054
1055 #[test]
1056 fn decode_properties_returns_none_if_properties_length_is_insufficient() {
1057 let mut bytes = BytesMut::from_iter(vec![
1058 0u8;
1059 SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4 + 2
1060 ]);
1061 BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1062 BigEndian::write_i32(
1063 &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1064 MESSAGE_MAGIC_CODE,
1065 );
1066 BigEndian::write_i32(
1067 &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4
1068 ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4],
1069 0,
1070 );
1071 BigEndian::write_i16(
1072 &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4
1073 ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4 + 2],
1074 1,
1075 );
1076 assert!(decode_properties(&mut bytes.freeze()).is_none());
1077 }
1078
1079 #[test]
1080 fn validate_message_id_ipv4_32_chars() {
1081 let result = validate_message_id("AC11000100002A9F0000000000000001");
1082 assert!(result.is_ok());
1083 }
1084
1085 #[test]
1086 fn validate_message_id_ipv6_56_chars() {
1087 let result = validate_message_id("20010db800000000000000000000000100002A9F0000000000000001");
1088 assert!(result.is_ok());
1089 }
1090
1091 #[test]
1092 fn validate_message_id_ipv6_40_chars_rejected() {
1093 let result = validate_message_id("20010db800000000000000000000000100000001");
1094 assert!(result.is_err());
1095 if let Err(e) = result {
1096 assert!(e.contains("Invalid message ID length"));
1097 assert!(e.contains("56 characters (IPv6)"));
1098 }
1099 }
1100
1101 #[test]
1102 fn decode_message_id_ipv6() {
1103 let msg_id = "20010db800000000000000000000000100002A9F0000000000000001";
1104 let message_id = decode_message_id(msg_id).unwrap();
1105 assert_eq!(message_id.address, "[2001:db8::1]:10911".parse().unwrap());
1106 assert_eq!(message_id.offset, 1);
1107 }
1108
1109 #[test]
1110 fn decode_message_id_ipv6_full_address() {
1111 let msg_id = "20010db81234567800000000abcdef0100002A9F0000000000001234";
1112 let message_id = decode_message_id(msg_id).unwrap();
1113 assert_eq!(
1114 message_id.address,
1115 "[2001:db8:1234:5678::abcd:ef01]:10911".parse().unwrap()
1116 );
1117 assert_eq!(message_id.offset, 4660);
1118 }
1119}