Skip to main content

rocketmq_common/common/message/
message_decoder.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::io::Cursor;
17use std::io::Write;
18use std::net::IpAddr;
19use std::net::Ipv4Addr;
20use std::net::Ipv6Addr;
21use std::net::SocketAddr;
22use std::net::SocketAddrV4;
23use std::net::SocketAddrV6;
24use std::str;
25
26use byteorder::BigEndian;
27use byteorder::ByteOrder;
28use bytes::Buf;
29use bytes::BufMut;
30use bytes::Bytes;
31use bytes::BytesMut;
32use cheetah_string::CheetahString;
33
34use crate::common::compression::compression_type::CompressionType;
35use crate::common::compression::compressor_factory::CompressorFactory;
36use crate::common::message::message_client_ext::MessageClientExt;
37use crate::common::message::message_ext::MessageExt;
38use crate::common::message::message_id::MessageId;
39use crate::common::message::message_property::MessageProperties;
40use crate::common::message::message_single::Message;
41use crate::common::message::MessageConst;
42use crate::common::message::MessageTrait;
43use crate::common::message::MessageVersion;
44use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
45use crate::utils::util_all;
46use crate::CRC32Utils::crc32;
47use crate::MessageAccessor::MessageAccessor;
48use crate::MessageUtils::build_message_id;
49
50pub const CHARSET_UTF8: &str = "UTF-8";
51pub const MESSAGE_MAGIC_CODE_POSITION: usize = 4;
52pub const MESSAGE_FLAG_POSITION: usize = 16;
53pub const MESSAGE_PHYSIC_OFFSET_POSITION: usize = 28;
54pub const MESSAGE_STORE_TIMESTAMP_POSITION: usize = 56;
55pub const MESSAGE_MAGIC_CODE: i32 = -626843481;
56pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;
57pub const BLANK_MAGIC_CODE: i32 = -875286124;
58pub const NAME_VALUE_SEPARATOR: char = '\u{0001}';
59pub const PROPERTY_SEPARATOR: char = '\u{0002}';
60pub const PHY_POS_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8;
61pub const QUEUE_OFFSET_POSITION: usize = 4 + 4 + 4 + 4 + 4;
62pub const SYSFLAG_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8 + 8;
63pub const BORN_TIMESTAMP_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8;
64
65pub fn string_to_message_properties(properties: Option<&CheetahString>) -> HashMap<CheetahString, CheetahString> {
66    let mut map = HashMap::new();
67    if let Some(properties) = properties {
68        let mut index = 0;
69        let len = properties.len();
70        while index < len {
71            let new_index = properties[index..].find(PROPERTY_SEPARATOR).map_or(len, |i| index + i);
72            if new_index - index >= 3 {
73                if let Some(kv_sep_index) = properties[index..new_index].find(NAME_VALUE_SEPARATOR) {
74                    let kv_sep_index = index + kv_sep_index;
75                    if kv_sep_index > index && kv_sep_index < new_index - 1 {
76                        let k = &properties[index..kv_sep_index];
77                        let v = &properties[kv_sep_index + 1..new_index];
78                        map.insert(CheetahString::from_slice(k), CheetahString::from_slice(v));
79                    }
80                }
81            }
82            index = new_index + 1;
83        }
84    }
85    map
86}
87
88pub fn str_to_message_properties(properties: Option<&str>) -> HashMap<CheetahString, CheetahString> {
89    let mut map = HashMap::new();
90    if let Some(properties) = properties {
91        let mut index = 0;
92        let len = properties.len();
93        while index < len {
94            let new_index = properties[index..].find(PROPERTY_SEPARATOR).map_or(len, |i| index + i);
95            if new_index - index >= 3 {
96                if let Some(kv_sep_index) = properties[index..new_index].find(NAME_VALUE_SEPARATOR) {
97                    let kv_sep_index = index + kv_sep_index;
98                    if kv_sep_index > index && kv_sep_index < new_index - 1 {
99                        let k = &properties[index..kv_sep_index];
100                        let v = &properties[kv_sep_index + 1..new_index];
101                        map.insert(CheetahString::from_slice(k), CheetahString::from_slice(v));
102                    }
103                }
104            }
105            index = new_index + 1;
106        }
107    }
108    map
109}
110
111pub fn message_properties_to_string(properties: &HashMap<CheetahString, CheetahString>) -> CheetahString {
112    let mut len = 0;
113    for (name, value) in properties.iter() {
114        len += name.len();
115
116        len += value.len();
117        len += 2; // separator
118    }
119
120    let mut sb = String::with_capacity(len);
121    for (name, value) in properties.iter() {
122        sb.push_str(name);
123        sb.push(NAME_VALUE_SEPARATOR);
124
125        sb.push_str(value);
126        sb.push(PROPERTY_SEPARATOR);
127    }
128    CheetahString::from_string(sb)
129}
130
131pub fn decode_client(
132    byte_buffer: &mut Bytes,
133    read_body: bool,
134    de_compress_body: bool,
135    is_set_properties_string: bool,
136    check_crc: bool,
137) -> Option<MessageClientExt> {
138    /*if let Some(msg_ext) = decode(
139        byte_buffer,
140        read_body,
141        de_compress_body,
142        false,
143        is_set_properties_string,
144        check_crc,
145    ) {
146        Some(MessageClientExt {
147            message_ext_inner: msg_ext,
148        })
149    } else {
150        None
151    }*/
152    decode(
153        byte_buffer,
154        read_body,
155        de_compress_body,
156        false,
157        is_set_properties_string,
158        check_crc,
159    )
160    .map(|msg_ext| MessageClientExt {
161        message_ext_inner: msg_ext,
162    })
163}
164
165//this method will optimize later
166pub fn decode(
167    byte_buffer: &mut Bytes,
168    read_body: bool,
169    de_compress_body: bool,
170    is_client: bool,
171    is_set_properties_string: bool,
172    check_crc: bool,
173) -> Option<MessageExt> {
174    let mut msg_ext = if is_client {
175        unimplemented!()
176    } else {
177        MessageExt::default()
178    };
179
180    // 1 TOTALSIZE
181    let store_size = byte_buffer.get_i32();
182    msg_ext.set_store_size(store_size);
183
184    // 2 MAGICCODE
185    let magic_code = byte_buffer.get_i32();
186    let version = MessageVersion::value_of_magic_code(magic_code).unwrap();
187
188    // 3 BODYCRC
189    let body_crc = byte_buffer.get_u32();
190    msg_ext.set_body_crc(body_crc);
191
192    // 4 QUEUEID
193    let queue_id = byte_buffer.get_i32();
194    msg_ext.set_queue_id(queue_id);
195
196    // 5 FLAG
197    let flag = byte_buffer.get_i32();
198    msg_ext.message.set_flag(flag);
199
200    // 6 QUEUEOFFSET
201    let queue_offset = byte_buffer.get_i64();
202    msg_ext.set_queue_offset(queue_offset);
203
204    // 7 PHYSICALOFFSET
205    let physic_offset = byte_buffer.get_i64();
206    msg_ext.set_commit_log_offset(physic_offset);
207
208    // 8 SYSFLAG
209    let sys_flag = byte_buffer.get_i32();
210    msg_ext.set_sys_flag(sys_flag);
211
212    // 9 BORNTIMESTAMP
213    let born_time_stamp = byte_buffer.get_i64();
214    msg_ext.set_born_timestamp(born_time_stamp);
215
216    // 10 BORNHOST
217    let (born_host_address, born_host_ip_length) = if sys_flag & MessageSysFlag::BORNHOST_V6_FLAG != 0 {
218        let mut born_host = [0; 16];
219        byte_buffer.copy_to_slice(&mut born_host);
220        let port = byte_buffer.get_i32();
221        (
222            SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(born_host), port as u16, 0, 0)),
223            16,
224        )
225    } else {
226        let mut born_host = [0; 4];
227        byte_buffer.copy_to_slice(&mut born_host);
228        let port = byte_buffer.get_i32();
229        (
230            SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(born_host), port as u16)),
231            4,
232        )
233    };
234    msg_ext.set_born_host(born_host_address);
235
236    // 11 STORETIMESTAMP
237    let store_timestamp = byte_buffer.get_i64();
238    msg_ext.set_store_timestamp(store_timestamp);
239
240    // 12 STOREHOST
241    let (store_host_address, store_host_ip_length) = if sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG != 0 {
242        let mut store_host = [0; 16];
243        byte_buffer.copy_to_slice(&mut store_host);
244        let port = byte_buffer.get_i32();
245        (
246            SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(store_host), port as u16, 0, 0)),
247            16,
248        )
249    } else {
250        let mut store_host = [0; 4];
251        byte_buffer.copy_to_slice(&mut store_host);
252        let port = byte_buffer.get_i32();
253        (
254            SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(store_host), port as u16)),
255            4,
256        )
257    };
258    msg_ext.set_store_host(store_host_address);
259
260    // 13 RECONSUMETIMES
261    let reconsume_times = byte_buffer.get_i32();
262    msg_ext.set_reconsume_times(reconsume_times);
263
264    // 14 Prepared Transaction Offset
265    let prepared_transaction_offset = byte_buffer.get_i64();
266    msg_ext.set_prepared_transaction_offset(prepared_transaction_offset);
267
268    // 15 BODY
269    let body_len = byte_buffer.get_i32();
270    if body_len > 0 {
271        // Handle reading and processing body
272        if read_body {
273            let mut body = vec![0; body_len as usize];
274            byte_buffer.copy_to_slice(&mut body);
275            if check_crc {
276                let crc = crc32(&body);
277                if crc != body_crc {
278                    return None;
279                }
280            }
281            let mut body_bytes = Bytes::from(body);
282            if de_compress_body && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG {
283                let compression_type =
284                    CompressionType::find_by_value((flag & MessageSysFlag::COMPRESSION_TYPE_COMPARATOR) >> 8);
285                body_bytes = compression_type.decompression(&body_bytes)
286            }
287            msg_ext.message.set_body(Some(body_bytes));
288        } else {
289            let _ = byte_buffer.split_to(
290                BORN_TIMESTAMP_POSITION
291                    + born_host_ip_length
292                    + 4
293                    + 8
294                    + store_host_ip_length
295                    + 4
296                    + 4
297                    + 8
298                    + 4
299                    + body_len as usize,
300            );
301        }
302    }
303
304    // 16 TOPIC
305    let topic_len = version.get_topic_length(byte_buffer);
306    let mut topic = vec![0; topic_len];
307    byte_buffer.copy_to_slice(&mut topic);
308    let topic_str = str::from_utf8(&topic).unwrap();
309    msg_ext.message.set_topic(CheetahString::from_slice(topic_str));
310
311    // 17 properties
312    let properties_length = byte_buffer.get_i16();
313    if properties_length > 0 {
314        // Handle reading and processing properties
315        let mut properties = vec![0; properties_length as usize];
316        byte_buffer.copy_to_slice(&mut properties);
317        if !is_set_properties_string {
318            //can optimize later
319            let properties_string =
320                CheetahString::from_string(String::from_utf8_lossy(properties.as_slice()).to_string());
321            let message_properties = string_to_message_properties(Some(&properties_string));
322            *msg_ext.message.properties_mut() = MessageProperties::from_map(message_properties);
323        } else {
324            let properties_string =
325                CheetahString::from_string(String::from_utf8_lossy(properties.as_slice()).to_string());
326            let mut message_properties = string_to_message_properties(Some(&properties_string));
327            message_properties.insert(CheetahString::from_static_str("propertiesString"), properties_string);
328            *msg_ext.message.properties_mut() = MessageProperties::from_map(message_properties);
329        }
330    }
331    let msg_id = build_message_id(store_host_address, physic_offset);
332    msg_ext.set_msg_id(CheetahString::from_string(msg_id));
333
334    if is_client {
335        unimplemented!()
336    }
337
338    Some(msg_ext)
339}
340
341pub fn count_inner_msg_num(bytes: Option<Bytes>) -> u32 {
342    match bytes {
343        None => 0,
344        Some(mut bytes) => {
345            let mut count = 0;
346            while bytes.has_remaining() {
347                let size = bytes.slice(0..4).get_i32();
348                if size as usize > bytes.len() {
349                    break;
350                }
351                let _ = bytes.split_to(size as usize);
352                count += 1;
353            }
354            count
355        }
356    }
357}
358
359pub fn encode_messages(messages: &[Message]) -> Bytes {
360    let mut bytes = BytesMut::new();
361    //let mut all_size = 0;
362    for message in messages {
363        let message_bytes = encode_message(message);
364        //all_size += message_bytes.len();
365        bytes.put_slice(&message_bytes);
366    }
367    bytes.freeze()
368}
369
370pub fn encode_message(message: &Message) -> Bytes {
371    let body = message.get_body().unwrap();
372    let body_len = body.len();
373    let properties = message_properties_to_string(message.properties().as_map());
374    let properties_bytes = properties.as_bytes();
375    let properties_length = properties_bytes.len();
376
377    let store_size = 4 // 1 TOTALSIZE
378             + 4 // 2 MAGICCOD
379             + 4 // 3 BODYCRC
380             + 4 // 4 FLAG
381             + 4 + body_len // 4 BODY
382             + 2 + properties_length;
383
384    let mut bytes = BytesMut::with_capacity(store_size);
385
386    // 1 TOTALSIZE
387    bytes.put_i32(store_size as i32);
388
389    // 2 MAGICCODE
390    bytes.put_i32(0);
391
392    // 3 BODYCRC
393    bytes.put_u32(0);
394
395    // 4 FLAG
396    bytes.put_i32(message.flag());
397
398    // 5 BODY
399    bytes.put_i32(body_len as i32);
400    bytes.put_slice(body);
401
402    // 6 PROPERTIES
403    bytes.put_i16(properties_length as i16);
404    bytes.put_slice(properties_bytes);
405
406    bytes.freeze()
407}
408
409pub fn decodes_batch(byte_buffer: &mut Bytes, read_body: bool, decompress_body: bool) -> Vec<MessageExt> {
410    let mut messages = Vec::new();
411    while byte_buffer.has_remaining() {
412        if let Some(msg_ext) = decode(byte_buffer, read_body, decompress_body, false, false, false) {
413            messages.push(msg_ext);
414        } else {
415            break;
416        }
417    }
418    messages
419}
420
421pub fn decodes_batch_client(byte_buffer: &mut Bytes, read_body: bool, decompress_body: bool) -> Vec<MessageClientExt> {
422    let mut messages = Vec::new();
423    while byte_buffer.has_remaining() {
424        if let Some(msg_ext) = decode_client(byte_buffer, read_body, decompress_body, false, false) {
425            messages.push(msg_ext);
426        } else {
427            break;
428        }
429    }
430    messages
431}
432
433pub fn decode_messages_from(mut message_ext: MessageExt, vec_: &mut Vec<MessageExt>) {
434    let messages = decode_messages(message_ext.message.body_mut().raw_mut().as_mut().unwrap());
435    for message in messages {
436        let mut message_ext_inner = MessageExt {
437            message,
438            ..MessageExt::default()
439        };
440        message_ext_inner.set_topic(message_ext.topic().to_owned());
441        message_ext_inner.queue_offset = message_ext.queue_offset;
442        message_ext_inner.queue_id = message_ext.queue_id;
443        message_ext_inner.set_flag(message_ext.get_flag());
444        //MessageAccessor::set_properties(&mut
445        // message_client_ext,message.get_properties().clone()); messageClientExt.
446        // setBody(message.getBody())
447        message_ext_inner.store_host = message_ext.store_host;
448        message_ext_inner.born_host = message_ext.born_host;
449        message_ext_inner.store_timestamp = message_ext.store_timestamp;
450        message_ext_inner.born_timestamp = message_ext.born_timestamp;
451        message_ext_inner.sys_flag = message_ext.sys_flag;
452        message_ext_inner.commit_log_offset = message_ext.commit_log_offset;
453        message_ext_inner.set_wait_store_msg_ok(message_ext.is_wait_store_msg_ok());
454        vec_.push(message_ext_inner);
455    }
456}
457
458pub fn decode_messages(buffer: &mut Bytes) -> Vec<Message> {
459    let mut messages = Vec::new();
460    while buffer.has_remaining() {
461        let message = decode_message(buffer);
462        messages.push(message);
463    }
464    messages
465}
466
467pub fn decode_message(buffer: &mut Bytes) -> Message {
468    // 1 TOTALSIZE
469    let _ = buffer.get_i32();
470
471    // 2 MAGICCODE
472    let _ = buffer.get_i32();
473
474    // 3 BODYCRC
475    let _ = buffer.get_i32();
476
477    // 4 FLAG
478    let flag = buffer.get_i32();
479
480    // 5 BODY
481    let body_len = buffer.get_i32();
482    let body = buffer.split_to(body_len as usize);
483
484    // 6 properties
485    let properties_length = buffer.get_i16();
486    let properties = buffer.split_to(properties_length as usize);
487    //string_to_message_properties(Some(&String::from_utf8_lossy(properties.as_ref()).
488    // to_string()));
489    let message_properties = str_to_message_properties(Some(str::from_utf8(&properties).unwrap()));
490
491    let mut message = Message::default();
492    message.set_body(Some(body));
493    message.set_properties(message_properties);
494    message.set_flag(flag);
495    message
496}
497
498const MSG_ID_IPV4_LEN: usize = 32;
499const MSG_ID_IPV6_LEN: usize = 56;
500
501pub fn validate_message_id(msg_id: &str) -> Result<(), String> {
502    let msg_id = msg_id.trim();
503
504    if msg_id.is_empty() {
505        return Err("Message ID cannot be empty".to_string());
506    }
507
508    let len = msg_id.len();
509    if len != MSG_ID_IPV4_LEN && len != MSG_ID_IPV6_LEN {
510        return Err(format!(
511            "Invalid message ID length: {len}. Expected {MSG_ID_IPV4_LEN} characters (IPv4) or {MSG_ID_IPV6_LEN} \
512             characters (IPv6)"
513        ));
514    }
515
516    if !msg_id.bytes().all(|b| b.is_ascii_hexdigit()) {
517        return Err("Message ID must be a valid hexadecimal string".to_string());
518    }
519
520    Ok(())
521}
522
523pub fn decode_message_id(msg_id: &str) -> Result<MessageId, String> {
524    validate_message_id(msg_id)?;
525    let bytes = util_all::string_to_bytes(msg_id)
526        .ok_or_else(|| "Failed to decode message ID: invalid hex string".to_string())?;
527    let mut buffer = Bytes::from(bytes);
528    let address = if msg_id.len() == 32 {
529        let mut ip = [0u8; 4];
530        buffer.copy_to_slice(&mut ip);
531        let port = buffer.get_i32();
532        SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
533    } else {
534        let mut ip = [0u8; 16];
535        buffer.copy_to_slice(&mut ip);
536        let port = buffer.get_i32();
537        SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
538    };
539    Ok(MessageId {
540        address,
541        offset: buffer.get_i64(),
542    })
543}
544
545pub fn encode(message_ext: &MessageExt, need_compress: bool) -> rocketmq_error::RocketMQResult<Bytes> {
546    let body = message_ext.get_body().unwrap();
547    let topic = message_ext.topic().as_bytes();
548    let topic_len = topic.len();
549    let properties = message_properties_to_string(message_ext.get_properties());
550    let properties_bytes = properties.as_bytes();
551    let properties_length = properties_bytes.len();
552    let sys_flag = message_ext.sys_flag;
553    let born_host_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
554        8
555    } else {
556        20
557    };
558    let store_host_address_length = if (sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG) == 0 {
559        8
560    } else {
561        20
562    };
563    let new_body = if need_compress && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG {
564        let compressor = CompressorFactory::get_compressor(MessageSysFlag::get_compression_type(sys_flag));
565        let compressed_body = compressor.compress(body, 5)?;
566        Some(compressed_body)
567    } else {
568        None
569    };
570    let body_len = new_body.as_ref().map_or(body.len(), |b| b.len());
571    let store_size = message_ext.store_size;
572    let mut byte_buffer = if store_size > 0 {
573        BytesMut::with_capacity(store_size as usize)
574    } else {
575        let store_size = 4 // 1 TOTALSIZE
576             + 4 // 2 MAGICCODE
577             + 4 // 3 BODYCRC
578             + 4 // 4 QUEUEID
579             + 4 // 5 FLAG
580             + 8 // 6 QUEUEOFFSET
581             + 8 // 7 PHYSICALOFFSET
582             + 4 // 8 SYSFLAG
583             + 8 // 9 BORNTIMESTAMP
584             + born_host_length // 10 BORNHOST
585             + 8 // 11 STORETIMESTAMP
586             + store_host_address_length // 12 STOREHOSTADDRESS
587             + 4 // 13 RECONSUMETIMES
588             + 8 // 14 Prepared Transaction Offset
589             + 4 + body_len // 14 BODY
590             + 1 + topic_len // 15 TOPIC
591             + 2 + properties_length; // 16 propertiesLength
592        BytesMut::with_capacity(store_size)
593    };
594
595    // 1 TOTALSIZE
596    byte_buffer.put_i32(store_size);
597
598    // 2 MAGICCODE
599    byte_buffer.put_i32(MESSAGE_MAGIC_CODE);
600
601    // 3 BODYCRC
602    byte_buffer.put_u32(message_ext.body_crc);
603
604    // 4 QUEUEID
605    byte_buffer.put_i32(message_ext.queue_id);
606
607    // 5 FLAG
608    byte_buffer.put_i32(message_ext.message.flag());
609
610    // 6 QUEUEOFFSET
611    byte_buffer.put_i64(message_ext.queue_offset);
612
613    // 7 PHYSICALOFFSET
614    byte_buffer.put_i64(message_ext.commit_log_offset);
615
616    // 8 SYSFLAG
617    byte_buffer.put_i32(message_ext.sys_flag);
618
619    // 9 BORNTIMESTAMP
620    byte_buffer.put_i64(message_ext.born_timestamp);
621
622    // 10 BORNHOST
623
624    let born_host = message_ext.born_host;
625    match born_host {
626        SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
627        SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
628    };
629
630    byte_buffer.put_i32(born_host.port() as i32);
631
632    // 11 STORETIMESTAMP
633    byte_buffer.put_i64(message_ext.store_timestamp);
634
635    // 12 STOREHOST
636
637    let store_host = message_ext.store_host;
638    match store_host {
639        SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
640        SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
641    };
642
643    byte_buffer.put_i32(store_host.port() as i32);
644
645    // 13 RECONSUMETIMES
646    byte_buffer.put_i32(message_ext.reconsume_times);
647
648    // 14 Prepared Transaction Offset
649    byte_buffer.put_i64(message_ext.prepared_transaction_offset);
650
651    // 15 BODY
652    byte_buffer.put_i32(body_len as i32);
653    if let Some(new_body) = new_body {
654        byte_buffer.put_slice(&new_body);
655    } else {
656        byte_buffer.put_slice(body);
657    }
658
659    // 16 TOPIC
660    byte_buffer.put_u8(topic_len as u8);
661    byte_buffer.put_slice(topic);
662
663    // 17 properties
664    byte_buffer.put_i16(properties_length as i16);
665    byte_buffer.put_slice(properties_bytes);
666
667    Ok(byte_buffer.freeze())
668}
669
670pub fn encode_uniquely(message_ext: &MessageExt, need_compress: bool) -> rocketmq_error::RocketMQResult<Bytes> {
671    let body = message_ext.get_body().unwrap();
672    let topics = message_ext.topic().as_bytes();
673    let topic_len = topics.len();
674    let properties = message_properties_to_string(message_ext.get_properties());
675    let properties_bytes = properties.as_bytes();
676    let properties_length = properties_bytes.len();
677    let sys_flag = message_ext.sys_flag;
678    let born_host_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
679        8
680    } else {
681        20
682    };
683    let new_body = if need_compress && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG {
684        let compressor = CompressorFactory::get_compressor(MessageSysFlag::get_compression_type(sys_flag));
685        let compressed_body = compressor.compress(body, 5)?;
686        Some(compressed_body)
687    } else {
688        None
689    };
690    let body_len = new_body.as_ref().map_or(body.len(), |b| b.len());
691    let store_size = message_ext.store_size;
692    let mut byte_buffer = if store_size > 0 {
693        BytesMut::with_capacity((store_size - 8) as usize)
694    } else {
695        let store_size = 4 // 1 TOTALSIZE
696             + 4 // 2 MAGICCODE
697             + 4 // 3 BODYCRC
698             + 4 // 4 QUEUEID
699             + 4 // 5 FLAG
700             + 8 // 6 QUEUEOFFSET
701             + 8 // 7 PHYSICALOFFSET
702             + 4 // 8 SYSFLAG
703             + 8 // 9 BORNTIMESTAMP
704             + born_host_length // 10 BORNHOST
705             + 4 // 11 RECONSUMETIMES
706             + 8 // 12 Prepared Transaction Offset
707             + 4 + body_len // 13 BODY
708             + 1 + topic_len // 14 TOPIC
709             + 2 + properties_length; // 15 propertiesLength
710        BytesMut::with_capacity(store_size)
711    };
712
713    // 1 TOTALSIZE
714    byte_buffer.put_i32(store_size);
715
716    // 2 MAGICCODE
717    byte_buffer.put_i32(MESSAGE_MAGIC_CODE);
718
719    // 3 BODYCRC
720    byte_buffer.put_u32(message_ext.body_crc);
721
722    // 4 QUEUEID
723    byte_buffer.put_i32(message_ext.queue_id);
724
725    // 5 FLAG
726    byte_buffer.put_i32(message_ext.message.flag());
727
728    // 6 QUEUEOFFSET
729    byte_buffer.put_i64(message_ext.queue_offset);
730
731    // 7 PHYSICALOFFSET
732    byte_buffer.put_i64(message_ext.commit_log_offset);
733
734    // 8 SYSFLAG
735    byte_buffer.put_i32(message_ext.sys_flag);
736
737    // 9 BORNTIMESTAMP
738    byte_buffer.put_i64(message_ext.born_timestamp);
739
740    // 10 BORNHOST
741
742    let born_host = message_ext.born_host;
743    match born_host {
744        SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
745        SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
746    };
747    byte_buffer.put_i32(born_host.port() as i32);
748
749    // 11 RECONSUMETIMES
750    byte_buffer.put_i32(message_ext.reconsume_times);
751
752    // 12 Prepared Transaction Offset
753    byte_buffer.put_i64(message_ext.prepared_transaction_offset);
754
755    // 13 BODY
756    byte_buffer.put_i32(body_len as i32);
757    if let Some(new_body) = new_body {
758        byte_buffer.put_slice(&new_body);
759    } else {
760        byte_buffer.put_slice(body);
761    }
762
763    // 14 TOPIC
764    byte_buffer.put_i16(topic_len as i16);
765    byte_buffer.put_slice(topics);
766
767    // 15 properties
768    byte_buffer.put_i16(properties_length as i16);
769    byte_buffer.put_slice(properties_bytes);
770
771    Ok(byte_buffer.freeze())
772}
773
774pub fn create_crc32(mut input: &mut [u8], crc32: u32) {
775    input.put(MessageConst::PROPERTY_CRC32.as_bytes());
776    input.put_u8(NAME_VALUE_SEPARATOR as u8);
777    let mut crc32 = crc32;
778    for _ in 0..10 {
779        let mut b = b'0';
780        if crc32 > 0 {
781            b += (crc32 % 10) as u8;
782            crc32 /= 10;
783        }
784        input.put_u8(b);
785    }
786    input.put_u8(PROPERTY_SEPARATOR as u8);
787}
788
789pub fn decode_properties(bytes: &mut Bytes) -> Option<HashMap<CheetahString, CheetahString>> {
790    // Ensure we have enough bytes to read SYSFLAG and MAGICCODE.
791    if bytes.len() < SYSFLAG_POSITION + 4 {
792        return None;
793    }
794
795    // Read sysFlag and magicCode using fixed positions.
796    let sys_flag = BigEndian::read_i32(&bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4]);
797    let magic_code = BigEndian::read_i32(&bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4]);
798    let version = match MessageVersion::value_of_magic_code(magic_code) {
799        Ok(value) => value,
800        Err(_) => return None,
801    };
802
803    // Determine address lengths.
804    let bornhost_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
805        8
806    } else {
807        20
808    };
809    let storehost_address_length = if (sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG) == 0 {
810        8
811    } else {
812        20
813    };
814
815    // Calculate the bodySizePosition as in Java.
816    let body_size_position = 4   // TOTALSIZE
817        + 4   // MAGICCODE
818        + 4   // BODYCRC
819        + 4   // QUEUEID
820        + 4   // FLAG
821        + 8   // QUEUEOFFSET
822        + 8   // PHYSICALOFFSET
823        + 4   // SYSFLAG
824        + 8   // BORNTIMESTAMP
825        + bornhost_length // BORNHOST
826        + 8   // STORETIMESTAMP
827        + storehost_address_length // STOREHOSTADDRESS
828        + 4   // RECONSUMETIMES
829        + 8; // Prepared Transaction Offset
830
831    if bytes.len() < body_size_position + 4 {
832        return None;
833    }
834
835    // Read the body size stored as an int.
836    let body_size = BigEndian::read_i32(&bytes[body_size_position..body_size_position + 4]) as usize;
837
838    // Compute the topic length position.
839    let topic_length_position = body_size_position + 4 + body_size;
840    if bytes.len() < topic_length_position {
841        return None;
842    }
843
844    // Create a Cursor over the slice starting at the topic length position.
845    let slice = &bytes[topic_length_position..];
846    let cursor = Cursor::new(slice);
847    let topic_length_size = version.get_topic_length_size();
848    bytes.advance(topic_length_position);
849    let topic_length = version.get_topic_length(bytes);
850
851    // Calculate the properties position.
852    let properties_position = topic_length_position + topic_length_size + topic_length;
853    if bytes.len() < properties_position + 2 {
854        return None;
855    }
856
857    // Read a short (2 bytes) as propertiesLength.
858    let properties_length = BigEndian::read_i16(&bytes[properties_position..properties_position + 2]);
859
860    // Advance past the short value.
861    let properties_start = properties_position + 2;
862    if properties_length > 0 {
863        let end = properties_start + (properties_length as usize);
864        if bytes.len() < end {
865            return None;
866        }
867        let properties_bytes = &bytes[properties_start..end];
868        if let Ok(properties_string) = String::from_utf8(properties_bytes.to_vec()) {
869            Some(string_to_message_properties(Some(&CheetahString::from_string(
870                properties_string,
871            ))))
872        } else {
873            None
874        }
875    } else {
876        None
877    }
878}
879
880#[cfg(test)]
881mod tests {
882    use bytes::BufMut;
883    use bytes::BytesMut;
884
885    use super::*;
886
887    #[test]
888    fn count_inner_msg_num_counts_correctly_for_multiple_messages() {
889        let mut bytes = BytesMut::new();
890        bytes.put_i32(8);
891        bytes.put_slice(&[0, 0, 0, 0]);
892        bytes.put_i32(8);
893        bytes.put_slice(&[0, 0, 0, 0]);
894        assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 2);
895    }
896
897    #[test]
898    fn count_inner_msg_num_counts_correctly_for_single_message() {
899        let mut bytes = BytesMut::new();
900        bytes.put_i32(8);
901        bytes.put_slice(&[0, 0, 0, 0]);
902        assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 1);
903    }
904
905    #[test]
906    fn count_inner_msg_num_counts_zero_for_no_messages() {
907        let bytes = BytesMut::new();
908        assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 0);
909    }
910
911    #[test]
912    fn count_inner_msg_num_ignores_incomplete_messages() {
913        let mut bytes = BytesMut::new();
914        bytes.put_i32(4);
915        assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 1);
916    }
917
918    #[test]
919    fn decode_message_id_ipv4() {
920        let msg_id = "7F0000010007D8260BF075769D36C348";
921        let message_id = decode_message_id(msg_id).unwrap();
922        assert_eq!(message_id.address, "127.0.0.1:55334".parse().unwrap());
923        assert_eq!(message_id.offset, 860316681131967304);
924    }
925
926    #[test]
927    fn encode_with_compression() {
928        let mut message_ext = MessageExt::default();
929        message_ext.set_body(Bytes::from("Hello, World!"));
930        let result = encode(&message_ext, true);
931        assert!(result.is_ok());
932        let bytes = result.unwrap();
933        assert!(!bytes.is_empty());
934    }
935
936    #[test]
937    fn encode_without_compression() {
938        let mut message_ext = MessageExt::default();
939        message_ext.set_body(Bytes::from("Hello, World!"));
940        let result = encode(&message_ext, false);
941        assert!(result.is_ok());
942        let bytes = result.unwrap();
943        assert!(!bytes.is_empty());
944    }
945
946    #[test]
947    fn encode_with_empty_body() {
948        let mut message_ext = MessageExt::default();
949        message_ext.set_body(Bytes::new());
950        let result = encode(&message_ext, false);
951        assert!(result.is_ok());
952        let bytes = result.unwrap();
953        assert!(!bytes.is_empty());
954    }
955
956    #[test]
957    fn encode_with_large_body() {
958        let mut message_ext = MessageExt::default();
959        let large_body = vec![0u8; 1024 * 1024];
960        message_ext.set_body(Bytes::from(large_body));
961        let result = encode(&message_ext, false);
962        assert!(result.is_ok());
963        let bytes = result.unwrap();
964        assert!(!bytes.is_empty());
965    }
966
967    #[test]
968    fn encode_uniquely_with_compression() {
969        let mut message_ext = MessageExt::default();
970        message_ext.set_body(Bytes::from("Hello, World!"));
971        let result = encode_uniquely(&message_ext, true);
972        assert!(result.is_ok());
973        let bytes = result.unwrap();
974        assert!(!bytes.is_empty());
975    }
976
977    #[test]
978    fn encode_uniquely_without_compression() {
979        let mut message_ext = MessageExt::default();
980        message_ext.set_body(Bytes::from("Hello, World!"));
981        let result = encode_uniquely(&message_ext, false);
982        assert!(result.is_ok());
983        let bytes = result.unwrap();
984        assert!(!bytes.is_empty());
985    }
986
987    #[test]
988    fn encode_uniquely_with_empty_body() {
989        let mut message_ext = MessageExt::default();
990        message_ext.set_body(Bytes::new());
991        let result = encode_uniquely(&message_ext, false);
992        assert!(result.is_ok());
993        let bytes = result.unwrap();
994        assert!(!bytes.is_empty());
995    }
996
997    #[test]
998    fn encode_uniquely_with_large_body() {
999        let mut message_ext = MessageExt::default();
1000        let large_body = vec![0u8; 1024 * 1024];
1001        message_ext.set_body(Bytes::from(large_body));
1002        let result = encode_uniquely(&message_ext, false);
1003        assert!(result.is_ok());
1004        let bytes = result.unwrap();
1005        assert!(!bytes.is_empty());
1006    }
1007
1008    #[test]
1009    fn decode_properties_returns_none_if_bytes_length_is_insufficient() {
1010        let mut bytes = Bytes::from(vec![0; SYSFLAG_POSITION + 3]);
1011        assert!(decode_properties(&mut bytes).is_none());
1012    }
1013
1014    #[test]
1015    fn decode_properties_returns_none_if_magic_code_is_invalid() {
1016        let mut bytes = BytesMut::from_iter(vec![0u8; SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4]);
1017        BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1018        BigEndian::write_i32(
1019            &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1020            -1,
1021        );
1022        assert!(decode_properties(&mut bytes.freeze()).is_none());
1023    }
1024
1025    #[test]
1026    fn decode_properties_returns_none_if_body_size_is_insufficient() {
1027        let mut bytes = BytesMut::from_iter(vec![0u8; SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4]);
1028        BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1029        BigEndian::write_i32(
1030            &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1031            MESSAGE_MAGIC_CODE,
1032        );
1033        assert!(decode_properties(&mut bytes.freeze()).is_none());
1034    }
1035
1036    #[test]
1037    fn decode_properties_returns_none_if_topic_length_is_insufficient() {
1038        let mut bytes = BytesMut::from_iter(vec![
1039            0u8;
1040            SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4
1041        ]);
1042        BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1043        BigEndian::write_i32(
1044            &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1045            MESSAGE_MAGIC_CODE,
1046        );
1047        BigEndian::write_i32(
1048            &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4
1049                ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4],
1050            0,
1051        );
1052        assert!(decode_properties(&mut bytes.freeze()).is_none());
1053    }
1054
1055    #[test]
1056    fn decode_properties_returns_none_if_properties_length_is_insufficient() {
1057        let mut bytes = BytesMut::from_iter(vec![
1058            0u8;
1059            SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4 + 2
1060        ]);
1061        BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1062        BigEndian::write_i32(
1063            &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1064            MESSAGE_MAGIC_CODE,
1065        );
1066        BigEndian::write_i32(
1067            &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4
1068                ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4],
1069            0,
1070        );
1071        BigEndian::write_i16(
1072            &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4
1073                ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4 + 2],
1074            1,
1075        );
1076        assert!(decode_properties(&mut bytes.freeze()).is_none());
1077    }
1078
1079    #[test]
1080    fn validate_message_id_ipv4_32_chars() {
1081        let result = validate_message_id("AC11000100002A9F0000000000000001");
1082        assert!(result.is_ok());
1083    }
1084
1085    #[test]
1086    fn validate_message_id_ipv6_56_chars() {
1087        let result = validate_message_id("20010db800000000000000000000000100002A9F0000000000000001");
1088        assert!(result.is_ok());
1089    }
1090
1091    #[test]
1092    fn validate_message_id_ipv6_40_chars_rejected() {
1093        let result = validate_message_id("20010db800000000000000000000000100000001");
1094        assert!(result.is_err());
1095        if let Err(e) = result {
1096            assert!(e.contains("Invalid message ID length"));
1097            assert!(e.contains("56 characters (IPv6)"));
1098        }
1099    }
1100
1101    #[test]
1102    fn decode_message_id_ipv6() {
1103        let msg_id = "20010db800000000000000000000000100002A9F0000000000000001";
1104        let message_id = decode_message_id(msg_id).unwrap();
1105        assert_eq!(message_id.address, "[2001:db8::1]:10911".parse().unwrap());
1106        assert_eq!(message_id.offset, 1);
1107    }
1108
1109    #[test]
1110    fn decode_message_id_ipv6_full_address() {
1111        let msg_id = "20010db81234567800000000abcdef0100002A9F0000000000001234";
1112        let message_id = decode_message_id(msg_id).unwrap();
1113        assert_eq!(
1114            message_id.address,
1115            "[2001:db8:1234:5678::abcd:ef01]:10911".parse().unwrap()
1116        );
1117        assert_eq!(message_id.offset, 4660);
1118    }
1119}