rocketmq_common/common/message/
message_decoder.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::io::Cursor;
19use std::io::Write;
20use std::net::IpAddr;
21use std::net::Ipv4Addr;
22use std::net::Ipv6Addr;
23use std::net::SocketAddr;
24use std::net::SocketAddrV4;
25use std::net::SocketAddrV6;
26use std::str;
27
28use byteorder::BigEndian;
29use byteorder::ByteOrder;
30use bytes::Buf;
31use bytes::BufMut;
32use bytes::Bytes;
33use bytes::BytesMut;
34use cheetah_string::CheetahString;
35
36use crate::common::compression::compression_type::CompressionType;
37use crate::common::compression::compressor_factory::CompressorFactory;
38use crate::common::message::message_client_ext::MessageClientExt;
39use crate::common::message::message_ext::MessageExt;
40use crate::common::message::message_id::MessageId;
41use crate::common::message::message_single::Message;
42use crate::common::message::MessageConst;
43use crate::common::message::MessageTrait;
44use crate::common::message::MessageVersion;
45use crate::common::sys_flag::message_sys_flag::MessageSysFlag;
46use crate::utils::util_all;
47use crate::CRC32Utils::crc32;
48use crate::MessageAccessor::MessageAccessor;
49use crate::MessageUtils::build_message_id;
50
51pub const CHARSET_UTF8: &str = "UTF-8";
52pub const MESSAGE_MAGIC_CODE_POSITION: usize = 4;
53pub const MESSAGE_FLAG_POSITION: usize = 16;
54pub const MESSAGE_PHYSIC_OFFSET_POSITION: usize = 28;
55pub const MESSAGE_STORE_TIMESTAMP_POSITION: usize = 56;
56pub const MESSAGE_MAGIC_CODE: i32 = -626843481;
57pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;
58pub const BLANK_MAGIC_CODE: i32 = -875286124;
59pub const NAME_VALUE_SEPARATOR: char = '\u{0001}';
60pub const PROPERTY_SEPARATOR: char = '\u{0002}';
61pub const PHY_POS_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8;
62pub const QUEUE_OFFSET_POSITION: usize = 4 + 4 + 4 + 4 + 4;
63pub const SYSFLAG_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8 + 8;
64pub const BORN_TIMESTAMP_POSITION: usize = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8;
65
66pub fn string_to_message_properties(
67    properties: Option<&CheetahString>,
68) -> HashMap<CheetahString, CheetahString> {
69    let mut map = HashMap::new();
70    if let Some(properties) = properties {
71        let mut index = 0;
72        let len = properties.len();
73        while index < len {
74            let new_index = properties[index..]
75                .find(PROPERTY_SEPARATOR)
76                .map_or(len, |i| index + i);
77            if new_index - index >= 3 {
78                if let Some(kv_sep_index) = properties[index..new_index].find(NAME_VALUE_SEPARATOR)
79                {
80                    let kv_sep_index = index + kv_sep_index;
81                    if kv_sep_index > index && kv_sep_index < new_index - 1 {
82                        let k = &properties[index..kv_sep_index];
83                        let v = &properties[kv_sep_index + 1..new_index];
84                        map.insert(CheetahString::from_slice(k), CheetahString::from_slice(v));
85                    }
86                }
87            }
88            index = new_index + 1;
89        }
90    }
91    map
92}
93
94pub fn str_to_message_properties(
95    properties: Option<&str>,
96) -> HashMap<CheetahString, CheetahString> {
97    let mut map = HashMap::new();
98    if let Some(properties) = properties {
99        let mut index = 0;
100        let len = properties.len();
101        while index < len {
102            let new_index = properties[index..]
103                .find(PROPERTY_SEPARATOR)
104                .map_or(len, |i| index + i);
105            if new_index - index >= 3 {
106                if let Some(kv_sep_index) = properties[index..new_index].find(NAME_VALUE_SEPARATOR)
107                {
108                    let kv_sep_index = index + kv_sep_index;
109                    if kv_sep_index > index && kv_sep_index < new_index - 1 {
110                        let k = &properties[index..kv_sep_index];
111                        let v = &properties[kv_sep_index + 1..new_index];
112                        map.insert(CheetahString::from_slice(k), CheetahString::from_slice(v));
113                    }
114                }
115            }
116            index = new_index + 1;
117        }
118    }
119    map
120}
121
122pub fn message_properties_to_string(
123    properties: &HashMap<CheetahString, CheetahString>,
124) -> CheetahString {
125    let mut len = 0;
126    for (name, value) in properties.iter() {
127        len += name.len();
128
129        len += value.len();
130        len += 2; // separator
131    }
132
133    let mut sb = String::with_capacity(len);
134    for (name, value) in properties.iter() {
135        sb.push_str(name);
136        sb.push(NAME_VALUE_SEPARATOR);
137
138        sb.push_str(value);
139        sb.push(PROPERTY_SEPARATOR);
140    }
141    CheetahString::from_string(sb)
142}
143
144pub fn decode_client(
145    byte_buffer: &mut Bytes,
146    read_body: bool,
147    de_compress_body: bool,
148    is_set_properties_string: bool,
149    check_crc: bool,
150) -> Option<MessageClientExt> {
151    /*if let Some(msg_ext) = decode(
152        byte_buffer,
153        read_body,
154        de_compress_body,
155        false,
156        is_set_properties_string,
157        check_crc,
158    ) {
159        Some(MessageClientExt {
160            message_ext_inner: msg_ext,
161        })
162    } else {
163        None
164    }*/
165    decode(
166        byte_buffer,
167        read_body,
168        de_compress_body,
169        false,
170        is_set_properties_string,
171        check_crc,
172    )
173    .map(|msg_ext| MessageClientExt {
174        message_ext_inner: msg_ext,
175    })
176}
177
178//this method will optimize later
179pub fn decode(
180    byte_buffer: &mut Bytes,
181    read_body: bool,
182    de_compress_body: bool,
183    is_client: bool,
184    is_set_properties_string: bool,
185    check_crc: bool,
186) -> Option<MessageExt> {
187    let mut msg_ext = if is_client {
188        unimplemented!()
189    } else {
190        MessageExt::default()
191    };
192
193    // 1 TOTALSIZE
194    let store_size = byte_buffer.get_i32();
195    msg_ext.set_store_size(store_size);
196
197    // 2 MAGICCODE
198    let magic_code = byte_buffer.get_i32();
199    let version = MessageVersion::value_of_magic_code(magic_code).unwrap();
200
201    // 3 BODYCRC
202    let body_crc = byte_buffer.get_u32();
203    msg_ext.set_body_crc(body_crc);
204
205    // 4 QUEUEID
206    let queue_id = byte_buffer.get_i32();
207    msg_ext.set_queue_id(queue_id);
208
209    // 5 FLAG
210    let flag = byte_buffer.get_i32();
211    msg_ext.message.flag = flag;
212
213    // 6 QUEUEOFFSET
214    let queue_offset = byte_buffer.get_i64();
215    msg_ext.set_queue_offset(queue_offset);
216
217    // 7 PHYSICALOFFSET
218    let physic_offset = byte_buffer.get_i64();
219    msg_ext.set_commit_log_offset(physic_offset);
220
221    // 8 SYSFLAG
222    let sys_flag = byte_buffer.get_i32();
223    msg_ext.set_sys_flag(sys_flag);
224
225    // 9 BORNTIMESTAMP
226    let born_time_stamp = byte_buffer.get_i64();
227    msg_ext.set_born_timestamp(born_time_stamp);
228
229    // 10 BORNHOST
230    let (born_host_address, born_host_ip_length) =
231        if sys_flag & MessageSysFlag::BORNHOST_V6_FLAG != 0 {
232            let mut born_host = [0; 16];
233            byte_buffer.copy_to_slice(&mut born_host);
234            let port = byte_buffer.get_i32();
235            (
236                SocketAddr::V6(SocketAddrV6::new(
237                    Ipv6Addr::from(born_host),
238                    port as u16,
239                    0,
240                    0,
241                )),
242                16,
243            )
244        } else {
245            let mut born_host = [0; 4];
246            byte_buffer.copy_to_slice(&mut born_host);
247            let port = byte_buffer.get_i32();
248            (
249                SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(born_host), port as u16)),
250                4,
251            )
252        };
253    msg_ext.set_born_host(born_host_address);
254
255    // 11 STORETIMESTAMP
256    let store_timestamp = byte_buffer.get_i64();
257    msg_ext.set_store_timestamp(store_timestamp);
258
259    // 12 STOREHOST
260    let (store_host_address, store_host_ip_length) =
261        if sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG != 0 {
262            let mut store_host = [0; 16];
263            byte_buffer.copy_to_slice(&mut store_host);
264            let port = byte_buffer.get_i32();
265            (
266                SocketAddr::V6(SocketAddrV6::new(
267                    Ipv6Addr::from(store_host),
268                    port as u16,
269                    0,
270                    0,
271                )),
272                16,
273            )
274        } else {
275            let mut store_host = [0; 4];
276            byte_buffer.copy_to_slice(&mut store_host);
277            let port = byte_buffer.get_i32();
278            (
279                SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(store_host), port as u16)),
280                4,
281            )
282        };
283    msg_ext.set_store_host(store_host_address);
284
285    // 13 RECONSUMETIMES
286    let reconsume_times = byte_buffer.get_i32();
287    msg_ext.set_reconsume_times(reconsume_times);
288
289    // 14 Prepared Transaction Offset
290    let prepared_transaction_offset = byte_buffer.get_i64();
291    msg_ext.set_prepared_transaction_offset(prepared_transaction_offset);
292
293    // 15 BODY
294    let body_len = byte_buffer.get_i32();
295    if body_len > 0 {
296        // Handle reading and processing body
297        if read_body {
298            let mut body = vec![0; body_len as usize];
299            byte_buffer.copy_to_slice(&mut body);
300            if check_crc {
301                let crc = crc32(&body);
302                if crc != body_crc {
303                    return None;
304                }
305            }
306            let mut body_bytes = Bytes::from(body);
307            if de_compress_body
308                && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG
309            {
310                let compression_type = CompressionType::find_by_value(
311                    (flag & MessageSysFlag::COMPRESSION_TYPE_COMPARATOR) >> 8,
312                );
313                body_bytes = compression_type.decompression(&body_bytes)
314            }
315            msg_ext.message.body = Some(body_bytes);
316        } else {
317            let _ = byte_buffer.split_to(
318                BORN_TIMESTAMP_POSITION
319                    + born_host_ip_length
320                    + 4
321                    + 8
322                    + store_host_ip_length
323                    + 4
324                    + 4
325                    + 8
326                    + 4
327                    + body_len as usize,
328            );
329        }
330    }
331
332    // 16 TOPIC
333    let topic_len = version.get_topic_length(byte_buffer);
334    let mut topic = vec![0; topic_len];
335    byte_buffer.copy_to_slice(&mut topic);
336    let topic_str = str::from_utf8(&topic).unwrap();
337    msg_ext.message.topic = CheetahString::from_slice(topic_str);
338
339    // 17 properties
340    let properties_length = byte_buffer.get_i16();
341    if properties_length > 0 {
342        // Handle reading and processing properties
343        let mut properties = vec![0; properties_length as usize];
344        byte_buffer.copy_to_slice(&mut properties);
345        if !is_set_properties_string {
346            //can optimize later
347            let properties_string = CheetahString::from_string(
348                String::from_utf8_lossy(properties.as_slice()).to_string(),
349            );
350            let message_properties = string_to_message_properties(Some(&properties_string));
351            msg_ext.message.properties = message_properties;
352        } else {
353            let properties_string = CheetahString::from_string(
354                String::from_utf8_lossy(properties.as_slice()).to_string(),
355            );
356            let mut message_properties = string_to_message_properties(Some(&properties_string));
357            message_properties.insert(
358                CheetahString::from_static_str("propertiesString"),
359                properties_string,
360            );
361            msg_ext.message.properties = message_properties;
362        }
363    }
364    let msg_id = build_message_id(store_host_address, physic_offset);
365    msg_ext.set_msg_id(CheetahString::from_string(msg_id));
366
367    if is_client {
368        unimplemented!()
369    }
370
371    Some(msg_ext)
372}
373
374pub fn count_inner_msg_num(bytes: Option<Bytes>) -> u32 {
375    match bytes {
376        None => 0,
377        Some(mut bytes) => {
378            let mut count = 0;
379            while bytes.has_remaining() {
380                let size = bytes.slice(0..4).get_i32();
381                if size as usize > bytes.len() {
382                    break;
383                }
384                let _ = bytes.split_to(size as usize);
385                count += 1;
386            }
387            count
388        }
389    }
390}
391
392pub fn encode_messages(messages: &[Message]) -> Bytes {
393    let mut bytes = BytesMut::new();
394    let mut all_size = 0;
395    for message in messages {
396        let message_bytes = encode_message(message);
397        all_size += message_bytes.len();
398        bytes.put_slice(&message_bytes);
399    }
400    bytes.freeze()
401}
402
403pub fn encode_message(message: &Message) -> Bytes {
404    let body = message.body.as_ref().unwrap();
405    let body_len = body.len();
406    let properties = message_properties_to_string(&message.properties);
407    let properties_bytes = properties.as_bytes();
408    let properties_length = properties_bytes.len();
409
410    let store_size = 4 // 1 TOTALSIZE
411             + 4 // 2 MAGICCOD
412             + 4 // 3 BODYCRC
413             + 4 // 4 FLAG
414             + 4 + body_len // 4 BODY
415             + 2 + properties_length;
416
417    let mut bytes = BytesMut::with_capacity(store_size);
418
419    // 1 TOTALSIZE
420    bytes.put_i32(store_size as i32);
421
422    // 2 MAGICCODE
423    bytes.put_i32(0);
424
425    // 3 BODYCRC
426    bytes.put_u32(0);
427
428    // 4 FLAG
429    bytes.put_i32(message.flag);
430
431    // 5 BODY
432    bytes.put_i32(body_len as i32);
433    bytes.put_slice(body);
434
435    // 6 PROPERTIES
436    bytes.put_i16(properties_length as i16);
437    bytes.put_slice(properties_bytes);
438
439    bytes.freeze()
440}
441
442pub fn decodes_batch(
443    byte_buffer: &mut Bytes,
444    read_body: bool,
445    decompress_body: bool,
446) -> Vec<MessageExt> {
447    let mut messages = Vec::new();
448    while byte_buffer.has_remaining() {
449        if let Some(msg_ext) = decode(byte_buffer, read_body, decompress_body, false, false, false)
450        {
451            messages.push(msg_ext);
452        } else {
453            break;
454        }
455    }
456    messages
457}
458
459pub fn decodes_batch_client(
460    byte_buffer: &mut Bytes,
461    read_body: bool,
462    decompress_body: bool,
463) -> Vec<MessageClientExt> {
464    let mut messages = Vec::new();
465    while byte_buffer.has_remaining() {
466        if let Some(msg_ext) = decode_client(byte_buffer, read_body, decompress_body, false, false)
467        {
468            messages.push(msg_ext);
469        } else {
470            break;
471        }
472    }
473    messages
474}
475
476pub fn decode_messages_from(mut message_ext: MessageExt, vec_: &mut Vec<MessageExt>) {
477    let messages = decode_messages(message_ext.message.body.as_mut().unwrap());
478    for message in messages {
479        let mut message_ext_inner = MessageExt {
480            message,
481            ..MessageExt::default()
482        };
483        message_ext_inner.set_topic(message_ext.get_topic().to_owned());
484        message_ext_inner.queue_offset = message_ext.queue_offset;
485        message_ext_inner.queue_id = message_ext.queue_id;
486        message_ext_inner.set_flag(message_ext.get_flag());
487        //MessageAccessor::set_properties(&mut
488        // message_client_ext,message.get_properties().clone()); messageClientExt.
489        // setBody(message.getBody())
490        message_ext_inner.store_host = message_ext.store_host;
491        message_ext_inner.born_host = message_ext.born_host;
492        message_ext_inner.store_timestamp = message_ext.store_timestamp;
493        message_ext_inner.born_timestamp = message_ext.born_timestamp;
494        message_ext_inner.sys_flag = message_ext.sys_flag;
495        message_ext_inner.commit_log_offset = message_ext.commit_log_offset;
496        message_ext_inner.set_wait_store_msg_ok(message_ext.is_wait_store_msg_ok());
497        vec_.push(message_ext_inner);
498    }
499}
500
501pub fn decode_messages(buffer: &mut Bytes) -> Vec<Message> {
502    let mut messages = Vec::new();
503    while buffer.has_remaining() {
504        let message = decode_message(buffer);
505        messages.push(message);
506    }
507    messages
508}
509
510pub fn decode_message(buffer: &mut Bytes) -> Message {
511    // 1 TOTALSIZE
512    let _ = buffer.get_i32();
513
514    // 2 MAGICCODE
515    let _ = buffer.get_i32();
516
517    // 3 BODYCRC
518    let _ = buffer.get_i32();
519
520    // 4 FLAG
521    let flag = buffer.get_i32();
522
523    // 5 BODY
524    let body_len = buffer.get_i32();
525    let body = buffer.split_to(body_len as usize);
526
527    // 6 properties
528    let properties_length = buffer.get_i16();
529    let properties = buffer.split_to(properties_length as usize);
530    //string_to_message_properties(Some(&String::from_utf8_lossy(properties.as_ref()).
531    // to_string()));
532    let message_properties = str_to_message_properties(Some(str::from_utf8(&properties).unwrap()));
533
534    Message {
535        body: Some(body),
536        properties: message_properties,
537        flag,
538        ..Message::default()
539    }
540}
541
542pub fn decode_message_id(msg_id: &str) -> MessageId {
543    let bytes = util_all::string_to_bytes(msg_id).unwrap();
544    let mut buffer = Bytes::from(bytes);
545    let len = if msg_id.len() == 32 {
546        let mut ip = [0u8; 4];
547        buffer.copy_to_slice(&mut ip);
548        let port = buffer.get_i32();
549        SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
550    } else {
551        let mut ip = [0u8; 16];
552        buffer.copy_to_slice(&mut ip);
553        let port = buffer.get_i32();
554        SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
555    };
556    MessageId {
557        address: len,
558        offset: buffer.get_i64(),
559    }
560}
561
562pub fn encode(
563    message_ext: &MessageExt,
564    need_compress: bool,
565) -> rocketmq_error::RocketMQResult<Bytes> {
566    let body = message_ext.get_body().unwrap();
567    let topic = message_ext.get_topic().as_bytes();
568    let topic_len = topic.len();
569    let properties = message_properties_to_string(message_ext.get_properties());
570    let properties_bytes = properties.as_bytes();
571    let properties_length = properties_bytes.len();
572    let sys_flag = message_ext.sys_flag;
573    let born_host_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
574        8
575    } else {
576        20
577    };
578    let store_host_address_length = if (sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG) == 0 {
579        8
580    } else {
581        20
582    };
583    let new_body = if need_compress
584        && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG
585    {
586        let compressor =
587            CompressorFactory::get_compressor(MessageSysFlag::get_compression_type(sys_flag));
588        let compressed_body = compressor.compress(body, 5)?;
589        Some(compressed_body)
590    } else {
591        None
592    };
593    let body_len = new_body.as_ref().map_or(body.len(), |b| b.len());
594    let store_size = message_ext.store_size;
595    let mut byte_buffer = if store_size > 0 {
596        BytesMut::with_capacity(store_size as usize)
597    } else {
598        let store_size = 4 // 1 TOTALSIZE
599             + 4 // 2 MAGICCODE
600             + 4 // 3 BODYCRC
601             + 4 // 4 QUEUEID
602             + 4 // 5 FLAG
603             + 8 // 6 QUEUEOFFSET
604             + 8 // 7 PHYSICALOFFSET
605             + 4 // 8 SYSFLAG
606             + 8 // 9 BORNTIMESTAMP
607             + born_host_length // 10 BORNHOST
608             + 8 // 11 STORETIMESTAMP
609             + store_host_address_length // 12 STOREHOSTADDRESS
610             + 4 // 13 RECONSUMETIMES
611             + 8 // 14 Prepared Transaction Offset
612             + 4 + body_len // 14 BODY
613             + 1 + topic_len // 15 TOPIC
614             + 2 + properties_length; // 16 propertiesLength
615        BytesMut::with_capacity(store_size)
616    };
617
618    // 1 TOTALSIZE
619    byte_buffer.put_i32(store_size);
620
621    // 2 MAGICCODE
622    byte_buffer.put_i32(MESSAGE_MAGIC_CODE);
623
624    // 3 BODYCRC
625    byte_buffer.put_u32(message_ext.body_crc);
626
627    // 4 QUEUEID
628    byte_buffer.put_i32(message_ext.queue_id);
629
630    // 5 FLAG
631    byte_buffer.put_i32(message_ext.message.flag);
632
633    // 6 QUEUEOFFSET
634    byte_buffer.put_i64(message_ext.queue_offset);
635
636    // 7 PHYSICALOFFSET
637    byte_buffer.put_i64(message_ext.commit_log_offset);
638
639    // 8 SYSFLAG
640    byte_buffer.put_i32(message_ext.sys_flag);
641
642    // 9 BORNTIMESTAMP
643    byte_buffer.put_i64(message_ext.born_timestamp);
644
645    // 10 BORNHOST
646
647    let born_host = message_ext.born_host;
648    match born_host {
649        SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
650        SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
651    };
652
653    byte_buffer.put_i32(born_host.port() as i32);
654
655    // 11 STORETIMESTAMP
656    byte_buffer.put_i64(message_ext.store_timestamp);
657
658    // 12 STOREHOST
659
660    let store_host = message_ext.store_host;
661    match store_host {
662        SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
663        SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
664    };
665
666    byte_buffer.put_i32(store_host.port() as i32);
667
668    // 13 RECONSUMETIMES
669    byte_buffer.put_i32(message_ext.reconsume_times);
670
671    // 14 Prepared Transaction Offset
672    byte_buffer.put_i64(message_ext.prepared_transaction_offset);
673
674    // 15 BODY
675    byte_buffer.put_i32(body_len as i32);
676    if let Some(new_body) = new_body {
677        byte_buffer.put_slice(&new_body);
678    } else {
679        byte_buffer.put_slice(body);
680    }
681
682    // 16 TOPIC
683    byte_buffer.put_u8(topic_len as u8);
684    byte_buffer.put_slice(topic);
685
686    // 17 properties
687    byte_buffer.put_i16(properties_length as i16);
688    byte_buffer.put_slice(properties_bytes);
689
690    Ok(byte_buffer.freeze())
691}
692
693pub fn encode_uniquely(
694    message_ext: &MessageExt,
695    need_compress: bool,
696) -> rocketmq_error::RocketMQResult<Bytes> {
697    let body = message_ext.get_body().unwrap();
698    let topics = message_ext.get_topic().as_bytes();
699    let topic_len = topics.len();
700    let properties = message_properties_to_string(message_ext.get_properties());
701    let properties_bytes = properties.as_bytes();
702    let properties_length = properties_bytes.len();
703    let sys_flag = message_ext.sys_flag;
704    let born_host_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
705        8
706    } else {
707        20
708    };
709    let new_body = if need_compress
710        && (sys_flag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG
711    {
712        let compressor =
713            CompressorFactory::get_compressor(MessageSysFlag::get_compression_type(sys_flag));
714        let compressed_body = compressor.compress(body, 5)?;
715        Some(compressed_body)
716    } else {
717        None
718    };
719    let body_len = new_body.as_ref().map_or(body.len(), |b| b.len());
720    let store_size = message_ext.store_size;
721    let mut byte_buffer = if store_size > 0 {
722        BytesMut::with_capacity((store_size - 8) as usize)
723    } else {
724        let store_size = 4 // 1 TOTALSIZE
725             + 4 // 2 MAGICCODE
726             + 4 // 3 BODYCRC
727             + 4 // 4 QUEUEID
728             + 4 // 5 FLAG
729             + 8 // 6 QUEUEOFFSET
730             + 8 // 7 PHYSICALOFFSET
731             + 4 // 8 SYSFLAG
732             + 8 // 9 BORNTIMESTAMP
733             + born_host_length // 10 BORNHOST
734             + 4 // 11 RECONSUMETIMES
735             + 8 // 12 Prepared Transaction Offset
736             + 4 + body_len // 13 BODY
737             + 1 + topic_len // 14 TOPIC
738             + 2 + properties_length; // 15 propertiesLength
739        BytesMut::with_capacity(store_size)
740    };
741
742    // 1 TOTALSIZE
743    byte_buffer.put_i32(store_size);
744
745    // 2 MAGICCODE
746    byte_buffer.put_i32(MESSAGE_MAGIC_CODE);
747
748    // 3 BODYCRC
749    byte_buffer.put_u32(message_ext.body_crc);
750
751    // 4 QUEUEID
752    byte_buffer.put_i32(message_ext.queue_id);
753
754    // 5 FLAG
755    byte_buffer.put_i32(message_ext.message.flag);
756
757    // 6 QUEUEOFFSET
758    byte_buffer.put_i64(message_ext.queue_offset);
759
760    // 7 PHYSICALOFFSET
761    byte_buffer.put_i64(message_ext.commit_log_offset);
762
763    // 8 SYSFLAG
764    byte_buffer.put_i32(message_ext.sys_flag);
765
766    // 9 BORNTIMESTAMP
767    byte_buffer.put_i64(message_ext.born_timestamp);
768
769    // 10 BORNHOST
770
771    let born_host = message_ext.born_host;
772    match born_host {
773        SocketAddr::V4(value) => byte_buffer.extend(value.ip().octets()),
774        SocketAddr::V6(value) => byte_buffer.extend(value.ip().octets()),
775    };
776    byte_buffer.put_i32(born_host.port() as i32);
777
778    // 11 RECONSUMETIMES
779    byte_buffer.put_i32(message_ext.reconsume_times);
780
781    // 12 Prepared Transaction Offset
782    byte_buffer.put_i64(message_ext.prepared_transaction_offset);
783
784    // 13 BODY
785    byte_buffer.put_i32(body_len as i32);
786    if let Some(new_body) = new_body {
787        byte_buffer.put_slice(&new_body);
788    } else {
789        byte_buffer.put_slice(body);
790    }
791
792    // 14 TOPIC
793    byte_buffer.put_i16(topic_len as i16);
794    byte_buffer.put_slice(topics);
795
796    // 15 properties
797    byte_buffer.put_i16(properties_length as i16);
798    byte_buffer.put_slice(properties_bytes);
799
800    Ok(byte_buffer.freeze())
801}
802
803pub fn create_crc32(mut input: &mut [u8], crc32: u32) {
804    input.put(MessageConst::PROPERTY_CRC32.as_bytes());
805    input.put_u8(NAME_VALUE_SEPARATOR as u8);
806    let mut crc32 = crc32;
807    for _ in 0..10 {
808        let mut b = b'0';
809        if crc32 > 0 {
810            b += (crc32 % 10) as u8;
811            crc32 /= 10;
812        }
813        input.put_u8(b);
814    }
815    input.put_u8(PROPERTY_SEPARATOR as u8);
816}
817
818pub fn decode_properties(bytes: &mut Bytes) -> Option<HashMap<CheetahString, CheetahString>> {
819    // Ensure we have enough bytes to read SYSFLAG and MAGICCODE.
820    if bytes.len() < SYSFLAG_POSITION + 4 {
821        return None;
822    }
823
824    // Read sysFlag and magicCode using fixed positions.
825    let sys_flag = BigEndian::read_i32(&bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4]);
826    let magic_code =
827        BigEndian::read_i32(&bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4]);
828    let version = match MessageVersion::value_of_magic_code(magic_code) {
829        Ok(value) => value,
830        Err(_) => return None,
831    };
832
833    // Determine address lengths.
834    let bornhost_length = if (sys_flag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 {
835        8
836    } else {
837        20
838    };
839    let storehost_address_length = if (sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG) == 0 {
840        8
841    } else {
842        20
843    };
844
845    // Calculate the bodySizePosition as in Java.
846    let body_size_position = 4   // TOTALSIZE
847        + 4   // MAGICCODE
848        + 4   // BODYCRC
849        + 4   // QUEUEID
850        + 4   // FLAG
851        + 8   // QUEUEOFFSET
852        + 8   // PHYSICALOFFSET
853        + 4   // SYSFLAG
854        + 8   // BORNTIMESTAMP
855        + bornhost_length // BORNHOST
856        + 8   // STORETIMESTAMP
857        + storehost_address_length // STOREHOSTADDRESS
858        + 4   // RECONSUMETIMES
859        + 8; // Prepared Transaction Offset
860
861    if bytes.len() < body_size_position + 4 {
862        return None;
863    }
864
865    // Read the body size stored as an int.
866    let body_size =
867        BigEndian::read_i32(&bytes[body_size_position..body_size_position + 4]) as usize;
868
869    // Compute the topic length position.
870    let topic_length_position = body_size_position + 4 + body_size;
871    if bytes.len() < topic_length_position {
872        return None;
873    }
874
875    // Create a Cursor over the slice starting at the topic length position.
876    let slice = &bytes[topic_length_position..];
877    let cursor = Cursor::new(slice);
878    let topic_length_size = version.get_topic_length_size();
879    bytes.advance(topic_length_position);
880    let topic_length = version.get_topic_length(bytes);
881
882    // Calculate the properties position.
883    let properties_position = topic_length_position + topic_length_size + topic_length;
884    if bytes.len() < properties_position + 2 {
885        return None;
886    }
887
888    // Read a short (2 bytes) as propertiesLength.
889    let properties_length =
890        BigEndian::read_i16(&bytes[properties_position..properties_position + 2]);
891
892    // Advance past the short value.
893    let properties_start = properties_position + 2;
894    if properties_length > 0 {
895        let end = properties_start + (properties_length as usize);
896        if bytes.len() < end {
897            return None;
898        }
899        let properties_bytes = &bytes[properties_start..end];
900        if let Ok(properties_string) = String::from_utf8(properties_bytes.to_vec()) {
901            Some(string_to_message_properties(Some(
902                &CheetahString::from_string(properties_string),
903            )))
904        } else {
905            None
906        }
907    } else {
908        None
909    }
910}
911
912#[cfg(test)]
913mod tests {
914    use bytes::BufMut;
915    use bytes::BytesMut;
916
917    use super::*;
918
919    #[test]
920    fn count_inner_msg_num_counts_correctly_for_multiple_messages() {
921        let mut bytes = BytesMut::new();
922        bytes.put_i32(8);
923        bytes.put_slice(&[0, 0, 0, 0]);
924        bytes.put_i32(8);
925        bytes.put_slice(&[0, 0, 0, 0]);
926        assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 2);
927    }
928
929    #[test]
930    fn count_inner_msg_num_counts_correctly_for_single_message() {
931        let mut bytes = BytesMut::new();
932        bytes.put_i32(8);
933        bytes.put_slice(&[0, 0, 0, 0]);
934        assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 1);
935    }
936
937    #[test]
938    fn count_inner_msg_num_counts_zero_for_no_messages() {
939        let bytes = BytesMut::new();
940        assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 0);
941    }
942
943    #[test]
944    fn count_inner_msg_num_ignores_incomplete_messages() {
945        let mut bytes = BytesMut::new();
946        bytes.put_i32(4);
947        assert_eq!(count_inner_msg_num(Some(bytes.freeze())), 1);
948    }
949
950    #[test]
951    fn decode_message_id_ipv4() {
952        let msg_id = "7F0000010007D8260BF075769D36C348";
953        let message_id = decode_message_id(msg_id);
954        assert_eq!(message_id.address, "127.0.0.1:55334".parse().unwrap());
955        assert_eq!(message_id.offset, 860316681131967304);
956    }
957
958    #[test]
959    fn encode_with_compression() {
960        let mut message_ext = MessageExt::default();
961        message_ext.set_body(Bytes::from("Hello, World!"));
962        let result = encode(&message_ext, true);
963        assert!(result.is_ok());
964        let bytes = result.unwrap();
965        assert!(!bytes.is_empty());
966    }
967
968    #[test]
969    fn encode_without_compression() {
970        let mut message_ext = MessageExt::default();
971        message_ext.set_body(Bytes::from("Hello, World!"));
972        let result = encode(&message_ext, false);
973        assert!(result.is_ok());
974        let bytes = result.unwrap();
975        assert!(!bytes.is_empty());
976    }
977
978    #[test]
979    fn encode_with_empty_body() {
980        let mut message_ext = MessageExt::default();
981        message_ext.set_body(Bytes::new());
982        let result = encode(&message_ext, false);
983        assert!(result.is_ok());
984        let bytes = result.unwrap();
985        assert!(!bytes.is_empty());
986    }
987
988    #[test]
989    fn encode_with_large_body() {
990        let mut message_ext = MessageExt::default();
991        let large_body = vec![0u8; 1024 * 1024];
992        message_ext.set_body(Bytes::from(large_body));
993        let result = encode(&message_ext, false);
994        assert!(result.is_ok());
995        let bytes = result.unwrap();
996        assert!(!bytes.is_empty());
997    }
998
999    #[test]
1000    fn encode_uniquely_with_compression() {
1001        let mut message_ext = MessageExt::default();
1002        message_ext.set_body(Bytes::from("Hello, World!"));
1003        let result = encode_uniquely(&message_ext, true);
1004        assert!(result.is_ok());
1005        let bytes = result.unwrap();
1006        assert!(!bytes.is_empty());
1007    }
1008
1009    #[test]
1010    fn encode_uniquely_without_compression() {
1011        let mut message_ext = MessageExt::default();
1012        message_ext.set_body(Bytes::from("Hello, World!"));
1013        let result = encode_uniquely(&message_ext, false);
1014        assert!(result.is_ok());
1015        let bytes = result.unwrap();
1016        assert!(!bytes.is_empty());
1017    }
1018
1019    #[test]
1020    fn encode_uniquely_with_empty_body() {
1021        let mut message_ext = MessageExt::default();
1022        message_ext.set_body(Bytes::new());
1023        let result = encode_uniquely(&message_ext, false);
1024        assert!(result.is_ok());
1025        let bytes = result.unwrap();
1026        assert!(!bytes.is_empty());
1027    }
1028
1029    #[test]
1030    fn encode_uniquely_with_large_body() {
1031        let mut message_ext = MessageExt::default();
1032        let large_body = vec![0u8; 1024 * 1024];
1033        message_ext.set_body(Bytes::from(large_body));
1034        let result = encode_uniquely(&message_ext, false);
1035        assert!(result.is_ok());
1036        let bytes = result.unwrap();
1037        assert!(!bytes.is_empty());
1038    }
1039
1040    #[test]
1041    fn decode_properties_returns_none_if_bytes_length_is_insufficient() {
1042        let mut bytes = Bytes::from(vec![0; SYSFLAG_POSITION + 3]);
1043        assert!(decode_properties(&mut bytes).is_none());
1044    }
1045
1046    #[test]
1047    fn decode_properties_returns_none_if_magic_code_is_invalid() {
1048        let mut bytes =
1049            BytesMut::from_iter(vec![
1050                0u8;
1051                SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4
1052            ]);
1053        BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1054        BigEndian::write_i32(
1055            &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1056            -1,
1057        );
1058        assert!(decode_properties(&mut bytes.freeze()).is_none());
1059    }
1060
1061    #[test]
1062    fn decode_properties_returns_none_if_body_size_is_insufficient() {
1063        let mut bytes = BytesMut::from_iter(vec![
1064            0u8;
1065            SYSFLAG_POSITION
1066                + 4
1067                + MESSAGE_MAGIC_CODE_POSITION
1068                + 4
1069                + 4
1070        ]);
1071        BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1072        BigEndian::write_i32(
1073            &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1074            MESSAGE_MAGIC_CODE,
1075        );
1076        assert!(decode_properties(&mut bytes.freeze()).is_none());
1077    }
1078
1079    #[test]
1080    fn decode_properties_returns_none_if_topic_length_is_insufficient() {
1081        let mut bytes = BytesMut::from_iter(vec![
1082            0u8;
1083            SYSFLAG_POSITION
1084                + 4
1085                + MESSAGE_MAGIC_CODE_POSITION
1086                + 4
1087                + 4
1088                + 4
1089        ]);
1090        BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1091        BigEndian::write_i32(
1092            &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1093            MESSAGE_MAGIC_CODE,
1094        );
1095        BigEndian::write_i32(
1096            &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4
1097                ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4],
1098            0,
1099        );
1100        assert!(decode_properties(&mut bytes.freeze()).is_none());
1101    }
1102
1103    #[test]
1104    fn decode_properties_returns_none_if_properties_length_is_insufficient() {
1105        let mut bytes = BytesMut::from_iter(vec![
1106            0u8;
1107            SYSFLAG_POSITION
1108                + 4
1109                + MESSAGE_MAGIC_CODE_POSITION
1110                + 4
1111                + 4
1112                + 4
1113                + 2
1114        ]);
1115        BigEndian::write_i32(&mut bytes[SYSFLAG_POSITION..SYSFLAG_POSITION + 4], 0);
1116        BigEndian::write_i32(
1117            &mut bytes[MESSAGE_MAGIC_CODE_POSITION..MESSAGE_MAGIC_CODE_POSITION + 4],
1118            MESSAGE_MAGIC_CODE,
1119        );
1120        BigEndian::write_i32(
1121            &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4
1122                ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4],
1123            0,
1124        );
1125        BigEndian::write_i16(
1126            &mut bytes[SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4
1127                ..SYSFLAG_POSITION + 4 + MESSAGE_MAGIC_CODE_POSITION + 4 + 4 + 4 + 2],
1128            1,
1129        );
1130        assert!(decode_properties(&mut bytes.freeze()).is_none());
1131    }
1132}