rocketmq_remoting/protocol/
remoting_command.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::fmt;
20use std::hint;
21use std::sync::atomic::AtomicI32;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::sync::Once;
25
26use bytes::Buf;
27use bytes::BufMut;
28use bytes::Bytes;
29use bytes::BytesMut;
30use cheetah_string::CheetahString;
31use lazy_static::lazy_static;
32use rocketmq_common::common::mq_version::RocketMqVersion;
33use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
34use rocketmq_common::EnvUtils::EnvUtils;
35use rocketmq_error::RocketmqError;
36use rocketmq_rust::ArcMut;
37use serde::Deserialize;
38use serde::Serialize;
39use tracing::error;
40
41use super::RemotingCommandType;
42use super::RemotingSerializable;
43use super::SerializeType;
44use crate::code::response_code::RemotingSysResponseCode;
45use crate::protocol::command_custom_header::CommandCustomHeader;
46use crate::protocol::command_custom_header::FromMap;
47use crate::protocol::LanguageCode;
48use crate::rocketmq_serializable::RocketMQSerializable;
49
50pub const SERIALIZE_TYPE_PROPERTY: &str = "rocketmq.serialize.type";
51pub const SERIALIZE_TYPE_ENV: &str = "ROCKETMQ_SERIALIZE_TYPE";
52pub const REMOTING_VERSION_KEY: &str = "rocketmq.remoting.version";
53
54lazy_static! {
55    static ref requestId: Arc<AtomicI32> = Arc::new(AtomicI32::new(0));
56    static ref CONFIG_VERSION: i32 = {
57        EnvUtils::get_property(REMOTING_VERSION_KEY)
58            .unwrap_or(String::from("0"))
59            .parse::<i32>()
60            .unwrap_or(0)
61    };
62    static ref INIT: Once = Once::new();
63    pub static ref SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER: SerializeType = {
64        let protocol = std::env::var(SERIALIZE_TYPE_PROPERTY).unwrap_or_else(|_| {
65            std::env::var(SERIALIZE_TYPE_ENV).unwrap_or_else(|_| "".to_string())
66        });
67        match protocol.as_str() {
68            "JSON" => SerializeType::JSON,
69            "ROCKETMQ" => SerializeType::ROCKETMQ,
70            _ => SerializeType::JSON,
71        }
72    };
73}
74
75fn set_cmd_version(cmd: &mut RemotingCommand) {
76    cmd.set_version_ref(*CONFIG_VERSION);
77}
78
79#[derive(Serialize, Deserialize)]
80pub struct RemotingCommand {
81    code: i32,
82    language: LanguageCode,
83    version: i32,
84    opaque: i32,
85
86    ///flag -> bit: 00
87    /// The lowest bit of the flag indicates whether it is a response command.
88    /// Non-zero indicates a response command, while 0 indicates a request command.
89    /// The second bit indicates whether it is a one-way request.
90    /// Non-zero indicates a one-way request.
91    flag: i32,
92    remark: Option<CheetahString>,
93
94    #[serde(rename = "extFields")]
95    ext_fields: Option<HashMap<CheetahString, CheetahString>>,
96
97    #[serde(skip)]
98    body: Option<Bytes>,
99    #[serde(skip)]
100    suspended: bool,
101    #[serde(skip)]
102    command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
103    #[serde(rename = "serializeTypeCurrentRPC")]
104    serialize_type: SerializeType,
105}
106
107impl Clone for RemotingCommand {
108    fn clone(&self) -> Self {
109        Self {
110            code: self.code,
111            language: self.language,
112            version: self.version,
113            opaque: self.opaque,
114            flag: self.flag,
115            remark: self.remark.clone(),
116            ext_fields: self.ext_fields.clone(),
117            body: self.body.clone(),
118            suspended: self.suspended,
119            command_custom_header: self.command_custom_header.clone(),
120            serialize_type: self.serialize_type,
121        }
122    }
123}
124
125impl fmt::Display for RemotingCommand {
126    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127        write!(
128            f,
129            "RemotingCommand [code={}, language={}, version={}, opaque={}, flag(B)={:b}, \
130             remark={}, extFields={:?}, serializeTypeCurrentRPC={}]",
131            self.code,
132            self.language,
133            self.version,
134            self.opaque,
135            self.flag,
136            self.remark.as_ref().unwrap_or(&CheetahString::default()),
137            self.ext_fields,
138            self.serialize_type
139        )
140    }
141}
142
143impl Default for RemotingCommand {
144    fn default() -> Self {
145        let opaque = requestId.fetch_add(1, Ordering::AcqRel);
146        RemotingCommand {
147            code: 0,
148            language: LanguageCode::RUST, // Replace with your actual enum variant
149            version: 0,
150            opaque,
151            flag: 0,
152            remark: None,
153            ext_fields: None,
154            body: None,
155            suspended: false,
156            command_custom_header: None,
157            serialize_type: *SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER,
158        }
159    }
160}
161
162impl RemotingCommand {
163    pub(crate) const RPC_ONEWAY: i32 = 1;
164    pub(crate) const RPC_TYPE: i32 = 0;
165}
166
167impl RemotingCommand {
168    pub fn new_request(code: impl Into<i32>, body: impl Into<Bytes>) -> Self {
169        Self::default().set_code(code).set_body(body)
170    }
171
172    pub fn create_request_command<T>(code: impl Into<i32>, header: T) -> Self
173    where
174        T: CommandCustomHeader + Sync + Send + 'static,
175    {
176        let mut command = Self::default()
177            .set_code(code.into())
178            .set_command_custom_header(header);
179        set_cmd_version(&mut command);
180        command
181    }
182
183    pub fn create_remoting_command(code: impl Into<i32>) -> Self {
184        let command = Self::default();
185        command.set_code(code.into())
186    }
187
188    pub fn get_and_add() -> i32 {
189        requestId.fetch_add(1, Ordering::AcqRel)
190    }
191
192    pub fn create_response_command_with_code(code: impl Into<i32>) -> Self {
193        Self::default().set_code(code).mark_response_type()
194    }
195
196    pub fn create_response_command_with_code_remark(
197        code: impl Into<i32>,
198        remark: impl Into<CheetahString>,
199    ) -> Self {
200        Self::default()
201            .set_code(code)
202            .set_remark_option(Some(remark.into()))
203            .mark_response_type()
204    }
205
206    pub fn create_response_command() -> Self {
207        Self::default()
208            .set_code(RemotingSysResponseCode::Success)
209            .mark_response_type()
210    }
211
212    pub fn create_response_command_with_header(
213        header: impl CommandCustomHeader + Sync + Send + 'static,
214    ) -> Self {
215        Self::default()
216            .set_code(RemotingSysResponseCode::Success)
217            .set_command_custom_header(header)
218            .mark_response_type()
219    }
220
221    pub fn set_command_custom_header<T>(mut self, command_custom_header: T) -> Self
222    where
223        T: CommandCustomHeader + Sync + Send + 'static,
224    {
225        self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
226        self
227    }
228
229    pub fn set_command_custom_header_origin(
230        mut self,
231        command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
232    ) -> Self {
233        self.command_custom_header = command_custom_header;
234        self
235    }
236
237    pub fn set_command_custom_header_ref<T>(&mut self, command_custom_header: T)
238    where
239        T: CommandCustomHeader + Sync + Send + 'static,
240    {
241        self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
242    }
243
244    pub fn set_code(mut self, code: impl Into<i32>) -> Self {
245        self.code = code.into();
246        self
247    }
248
249    pub fn set_code_ref(&mut self, code: impl Into<i32>) {
250        self.code = code.into();
251    }
252
253    pub fn set_code_mut(&mut self, code: impl Into<i32>) -> &mut Self {
254        self.code = code.into();
255        self
256    }
257
258    pub fn set_language(mut self, language: LanguageCode) -> Self {
259        self.language = language;
260        self
261    }
262
263    pub fn set_version_ref(&mut self, version: i32) {
264        self.version = version;
265    }
266
267    pub fn set_version(mut self, version: i32) -> Self {
268        self.version = version;
269        self
270    }
271
272    #[inline]
273    pub fn set_opaque(mut self, opaque: i32) -> Self {
274        self.opaque = opaque;
275        self
276    }
277
278    #[inline]
279    pub fn set_opaque_mut(&mut self, opaque: i32) {
280        self.opaque = opaque;
281    }
282
283    #[inline]
284    pub fn set_flag(mut self, flag: i32) -> Self {
285        self.flag = flag;
286        self
287    }
288
289    #[inline]
290    pub fn set_remark_option(mut self, remark: Option<impl Into<CheetahString>>) -> Self {
291        self.remark = remark.map(|item| item.into());
292        self
293    }
294
295    #[inline]
296    pub fn set_remark(mut self, remark: impl Into<CheetahString>) -> Self {
297        self.remark = Some(remark.into());
298        self
299    }
300
301    #[inline]
302    pub fn set_remark_option_mut(&mut self, remark: Option<impl Into<CheetahString>>) {
303        self.remark = remark.map(|item| item.into());
304    }
305
306    #[inline]
307    pub fn set_remark_mut(&mut self, remark: impl Into<CheetahString>) {
308        self.remark = Some(remark.into());
309    }
310
311    #[inline]
312    pub fn set_ext_fields(mut self, ext_fields: HashMap<CheetahString, CheetahString>) -> Self {
313        self.ext_fields = Some(ext_fields);
314        self
315    }
316
317    #[inline]
318    pub fn set_body(mut self, body: impl Into<Bytes>) -> Self {
319        self.body = Some(body.into());
320        self
321    }
322
323    #[inline]
324    pub fn set_body_mut_ref(&mut self, body: impl Into<Bytes>) {
325        self.body = Some(body.into());
326    }
327
328    #[inline]
329    pub fn set_suspended(mut self, suspended: bool) -> Self {
330        self.suspended = suspended;
331        self
332    }
333
334    #[inline]
335    pub fn set_suspended_ref(&mut self, suspended: bool) {
336        self.suspended = suspended;
337    }
338
339    #[inline]
340    pub fn set_serialize_type(mut self, serialize_type: SerializeType) -> Self {
341        self.serialize_type = serialize_type;
342        self
343    }
344
345    #[inline]
346    pub fn mark_response_type(mut self) -> Self {
347        let mark = 1 << Self::RPC_TYPE;
348        self.flag |= mark;
349        self
350    }
351
352    #[inline]
353    pub fn mark_response_type_ref(&mut self) {
354        let mark = 1 << Self::RPC_TYPE;
355        self.flag |= mark;
356    }
357
358    #[inline]
359    pub fn mark_oneway_rpc(mut self) -> Self {
360        let mark = 1 << Self::RPC_ONEWAY;
361        self.flag |= mark;
362        self
363    }
364
365    #[inline]
366    pub fn mark_oneway_rpc_ref(&mut self) {
367        let mark = 1 << Self::RPC_ONEWAY;
368        self.flag |= mark;
369    }
370
371    #[inline]
372    pub fn get_serialize_type(&self) -> SerializeType {
373        self.serialize_type
374    }
375
376    pub fn header_encode(&mut self) -> Option<Bytes> {
377        self.make_custom_header_to_net();
378        if SerializeType::ROCKETMQ == self.serialize_type {
379            Some(RocketMQSerializable::rocket_mq_protocol_encode_bytes(self))
380        } else {
381            match self.encode() {
382                Ok(value) => Some(Bytes::from(value)),
383                Err(e) => {
384                    error!("Failed to encode generic: {}", e);
385                    None
386                }
387            }
388        }
389    }
390
391    pub fn encode_header(&mut self) -> Option<Bytes> {
392        if let Some(body) = &self.body {
393            let size = body.len();
394            self.encode_header_with_body_length(size)
395        } else {
396            self.encode_header_with_body_length(0)
397        }
398    }
399    pub fn encode_header_with_body_length(&mut self, body_length: usize) -> Option<Bytes> {
400        //for zero copy
401        // 1> header length size
402        let mut length = 4;
403
404        // 2> header data length
405        let header_data = self.header_encode().unwrap();
406
407        length += header_data.len();
408
409        // 3> body data length
410        length += body_length;
411
412        let mut result = BytesMut::with_capacity(4 + length - body_length);
413
414        // length
415        result.put_i32(length as i32);
416
417        // header length
418        result.put_i32(mark_protocol_type(
419            header_data.len() as i32,
420            self.serialize_type,
421        ));
422
423        // header data
424        result.put(header_data);
425
426        Some(result.freeze())
427    }
428
429    pub fn make_custom_header_to_net(&mut self) {
430        if let Some(header) = &self.command_custom_header {
431            let option = header.to_map();
432
433            match &mut self.ext_fields {
434                None => {
435                    self.ext_fields = option;
436                }
437                Some(ext) => {
438                    if let Some(val) = option {
439                        for (key, value) in &val {
440                            ext.insert(key.clone(), value.clone());
441                        }
442                    }
443                }
444            }
445        }
446    }
447
448    pub fn fast_header_encode(&mut self, dst: &mut BytesMut) {
449        match self.serialize_type {
450            SerializeType::JSON => {
451                self.make_custom_header_to_net();
452                let header = match serde_json::to_vec(self) {
453                    Ok(value) => Some(value),
454                    Err(e) => {
455                        error!("Failed to encode generic: {}", e);
456                        None
457                    }
458                };
459                let header_length = header.as_ref().map_or(0, |h| h.len()) as i32;
460                let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
461                let total_length = 4 + header_length + body_length;
462
463                dst.reserve((total_length + 4) as usize);
464                dst.put_i32(total_length);
465                let serialize_type =
466                    RemotingCommand::mark_serialize_type(header_length, SerializeType::JSON);
467                dst.put_i32(serialize_type);
468
469                if let Some(header_inner) = header {
470                    dst.put(header_inner.as_slice());
471                }
472            }
473            SerializeType::ROCKETMQ => {
474                let begin_index = dst.len();
475                dst.put_i64(0);
476                if let Some(header) = self.command_custom_header_ref() {
477                    if !header.support_fast_codec() {
478                        self.make_custom_header_to_net();
479                    }
480                }
481                let header_size = RocketMQSerializable::rocketmq_protocol_encode(self, dst);
482                let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
483                let serialize_type = RemotingCommand::mark_serialize_type(
484                    header_size as i32,
485                    SerializeType::ROCKETMQ,
486                );
487                dst[begin_index..begin_index + 4]
488                    .copy_from_slice(&(header_size as i32 + body_length).to_be_bytes());
489                dst[begin_index + 4..begin_index + 8]
490                    .copy_from_slice(&serialize_type.to_be_bytes());
491            }
492        }
493    }
494
495    pub fn decode(src: &mut BytesMut) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
496        let read_to = src.len();
497        if read_to < 4 {
498            // Wait for more data when there are less than 4 bytes.
499            return Ok(None);
500        }
501        //Read the total size as a big-endian i32 from the first 4 bytes.
502        let total_size = i32::from_be_bytes([src[0], src[1], src[2], src[3]]) as usize;
503
504        if read_to < total_size + 4 {
505            // Wait for more data when the available data is less than the total size.
506            return Ok(None);
507        }
508        // Split the BytesMut to get the command data including the total size.
509        let mut cmd_data = src.split_to(total_size + 4);
510        // Discard the first i32 (total size).
511        cmd_data.advance(4);
512        if cmd_data.remaining() < 4 {
513            return Ok(None);
514        }
515        // Read the header length as a big-endian i32.
516        let ori_header_length = cmd_data.get_i32();
517        let header_length = parse_header_length(ori_header_length);
518        if header_length > total_size - 4 {
519            return Err(RocketmqError::RemotingCommandDecoderError(format!(
520                "Header length {header_length} is greater than total size {total_size}"
521            )));
522        }
523        let protocol_type = parse_serialize_type(ori_header_length)?;
524        // Assume the header is of i32 type and directly get it from the data.
525        let mut header_data = cmd_data.split_to(header_length);
526
527        let mut cmd =
528            RemotingCommand::header_decode(&mut header_data, header_length, protocol_type)?;
529
530        if let Some(cmd) = cmd.as_mut() {
531            if total_size - 4 > header_length {
532                cmd.set_body_mut_ref(cmd_data.split_to(total_size - 4 - header_length).freeze());
533            }
534        }
535        Ok(cmd)
536    }
537
538    pub fn header_decode(
539        src: &mut BytesMut,
540        header_length: usize,
541        type_: SerializeType,
542    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
543        match type_ {
544            SerializeType::JSON => {
545                let cmd =
546                    SerdeJsonUtils::from_json_slice::<RemotingCommand>(src).map_err(|error| {
547                        // Handle deserialization error gracefully
548                        RocketmqError::RemotingCommandDecoderError(format!(
549                            "Deserialization error: {error}"
550                        ))
551                    })?;
552
553                Ok(Some(cmd.set_serialize_type(SerializeType::JSON)))
554            }
555            SerializeType::ROCKETMQ => {
556                let cmd = RocketMQSerializable::rocket_mq_protocol_decode(src, header_length)?;
557                Ok(Some(cmd.set_serialize_type(SerializeType::ROCKETMQ)))
558            }
559        }
560    }
561
562    #[inline]
563    pub fn get_body(&self) -> Option<&Bytes> {
564        self.body.as_ref()
565    }
566
567    #[inline]
568    pub fn get_body_mut(&mut self) -> Option<&mut Bytes> {
569        self.body.as_mut()
570    }
571
572    #[inline]
573    pub fn mark_serialize_type(header_length: i32, protocol_type: SerializeType) -> i32 {
574        ((protocol_type.get_code() as i32) << 24) | (header_length & 0x00FFFFFF)
575    }
576
577    #[inline]
578    pub fn code(&self) -> i32 {
579        self.code
580    }
581
582    #[inline]
583    pub fn language(&self) -> LanguageCode {
584        self.language
585    }
586
587    #[inline]
588    pub fn version(&self) -> i32 {
589        self.version
590    }
591
592    pub fn rocketmq_version(&self) -> RocketMqVersion {
593        RocketMqVersion::from_ordinal(self.version as u32)
594    }
595
596    #[inline]
597    pub fn opaque(&self) -> i32 {
598        self.opaque
599    }
600
601    #[inline]
602    pub fn flag(&self) -> i32 {
603        self.flag
604    }
605
606    #[inline]
607    pub fn remark(&self) -> Option<&CheetahString> {
608        self.remark.as_ref()
609    }
610
611    #[inline]
612    pub fn ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
613        self.ext_fields.as_ref()
614    }
615
616    #[inline]
617    pub fn body(&self) -> &Option<Bytes> {
618        &self.body
619    }
620
621    #[inline]
622    pub fn take_body(&mut self) -> Option<Bytes> {
623        self.body.take()
624    }
625
626    #[inline]
627    pub fn suspended(&self) -> bool {
628        self.suspended
629    }
630
631    #[inline]
632    pub fn serialize_type(&self) -> SerializeType {
633        self.serialize_type
634    }
635
636    pub fn decode_command_custom_header<T>(&self) -> rocketmq_error::RocketMQResult<T>
637    where
638        T: FromMap<Target = T, Error = RocketmqError>,
639    {
640        match self.ext_fields {
641            None => Err(RocketmqError::DeserializeHeaderError(
642                "ExtFields is None".to_string(),
643            )),
644            Some(ref header) => T::from(header),
645        }
646    }
647
648    pub fn decode_command_custom_header_fast<T>(&self) -> rocketmq_error::RocketMQResult<T>
649    where
650        T: FromMap<Target = T, Error = RocketmqError>,
651        T: Default + CommandCustomHeader,
652    {
653        match self.ext_fields {
654            None => Err(RocketmqError::DeserializeHeaderError(
655                "ExtFields is None".to_string(),
656            )),
657            Some(ref header) => {
658                let mut target = T::default();
659                if target.support_fast_codec() {
660                    target.decode_fast(header)?;
661                    Ok(target)
662                } else {
663                    T::from(header)
664                }
665            }
666        }
667    }
668
669    #[inline]
670    pub fn is_response_type(&self) -> bool {
671        let bits = 1 << Self::RPC_TYPE;
672        (self.flag & bits) == bits
673    }
674
675    #[inline]
676    pub fn is_oneway_rpc(&self) -> bool {
677        let bits = 1 << Self::RPC_ONEWAY;
678        (self.flag & bits) == bits
679    }
680
681    pub fn get_type(&self) -> RemotingCommandType {
682        if self.is_response_type() {
683            RemotingCommandType::RESPONSE
684        } else {
685            RemotingCommandType::REQUEST
686        }
687    }
688
689    #[inline]
690    pub fn with_opaque(&mut self, opaque: i32) -> &mut Self {
691        self.opaque = opaque;
692        self
693    }
694
695    pub fn add_ext_field(
696        &mut self,
697        key: impl Into<CheetahString>,
698        value: impl Into<CheetahString>,
699    ) -> &mut Self {
700        if let Some(ref mut ext) = self.ext_fields {
701            ext.insert(key.into(), value.into());
702        }
703        self
704    }
705
706    #[inline]
707    pub fn with_code(&mut self, code: impl Into<i32>) -> &mut Self {
708        self.code = code.into();
709        self
710    }
711
712    #[inline]
713    pub fn with_remark(&mut self, remark: impl Into<CheetahString>) -> &mut Self {
714        self.remark = Some(remark.into());
715        self
716    }
717
718    #[inline]
719    pub fn get_ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
720        self.ext_fields.as_ref()
721    }
722
723    pub fn read_custom_header_ref<T>(&self) -> Option<&T>
724    where
725        T: CommandCustomHeader + Sync + Send + 'static,
726    {
727        match self.command_custom_header.as_ref() {
728            None => None,
729            Some(value) => value.as_ref().as_any().downcast_ref::<T>(),
730        }
731    }
732
733    pub fn read_custom_header_ref_unchecked<T>(&self) -> &T
734    where
735        T: CommandCustomHeader + Sync + Send + 'static,
736    {
737        match self.command_custom_header.as_ref() {
738            None => unsafe { hint::unreachable_unchecked() },
739            Some(value) => value.as_ref().as_any().downcast_ref::<T>().unwrap(),
740        }
741    }
742
743    pub fn read_custom_header_mut<T>(&mut self) -> Option<&mut T>
744    where
745        T: CommandCustomHeader + Sync + Send + 'static,
746    {
747        match self.command_custom_header.as_mut() {
748            None => None,
749            Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>(),
750        }
751    }
752
753    pub fn read_custom_header_mut_from_ref<T>(&self) -> Option<&mut T>
754    where
755        T: CommandCustomHeader + Sync + Send + 'static,
756    {
757        match self.command_custom_header.as_ref() {
758            None => None,
759            Some(value) => value.mut_from_ref().as_any_mut().downcast_mut::<T>(),
760        }
761    }
762
763    pub fn read_custom_header_mut_unchecked<T>(&mut self) -> &mut T
764    where
765        T: CommandCustomHeader + Sync + Send + 'static,
766    {
767        match self.command_custom_header.as_mut() {
768            None => unsafe { hint::unreachable_unchecked() },
769            Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>().unwrap(),
770        }
771    }
772
773    pub fn command_custom_header_ref(&self) -> Option<&dyn CommandCustomHeader> {
774        match self.command_custom_header.as_ref() {
775            None => None,
776            Some(value) => Some(value.as_ref().as_ref()),
777        }
778    }
779
780    pub fn command_custom_header_mut(&mut self) -> Option<&mut dyn CommandCustomHeader> {
781        match self.command_custom_header.as_mut() {
782            None => None,
783            Some(value) => Some(value.as_mut().as_mut()),
784        }
785    }
786
787    pub fn create_new_request_id() -> i32 {
788        requestId.fetch_add(1, Ordering::AcqRel)
789    }
790
791    #[inline]
792    pub fn add_ext_field_if_not_exist(
793        &mut self,
794        key: impl Into<CheetahString>,
795        value: impl Into<CheetahString>,
796    ) {
797        if let Some(ref mut ext) = self.ext_fields {
798            ext.entry(key.into()).or_insert(value.into());
799        }
800    }
801}
802
803pub fn parse_header_length(size: i32) -> usize {
804    (size & 0xFFFFFF) as usize
805}
806
807pub fn mark_protocol_type(source: i32, serialize_type: SerializeType) -> i32 {
808    ((serialize_type.get_code() as i32) << 24) | (source & 0x00FFFFFF)
809}
810
811pub fn parse_serialize_type(size: i32) -> rocketmq_error::RocketMQResult<SerializeType> {
812    let code = (size >> 24) as u8;
813    match SerializeType::value_of(code) {
814        None => Err(RocketmqError::NotSupportSerializeType(code)),
815        Some(value) => Ok(value),
816    }
817}
818
819impl AsRef<RemotingCommand> for RemotingCommand {
820    #[inline]
821    fn as_ref(&self) -> &RemotingCommand {
822        self
823    }
824}
825
826impl AsMut<RemotingCommand> for RemotingCommand {
827    #[inline]
828    fn as_mut(&mut self) -> &mut RemotingCommand {
829        self
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use super::*;
836
837    #[test]
838    fn test_remoting_command() {
839        let command = RemotingCommand::create_remoting_command(1)
840            .set_code(1)
841            .set_language(LanguageCode::JAVA)
842            .set_opaque(1)
843            .set_flag(1)
844            .set_ext_fields(HashMap::new())
845            .set_remark_option(Some("remark".to_string()));
846
847        assert_eq!(
848            "{\"code\":1,\"language\":\"JAVA\",\"version\":0,\"opaque\":1,\"flag\":1,\"remark\":\"\
849             remark\",\"extFields\":{},\"serializeTypeCurrentRPC\":\"JSON\"}",
850            serde_json::to_string(&command).unwrap()
851        );
852    }
853
854    #[test]
855    fn test_mark_serialize_type() {
856        let i = RemotingCommand::mark_serialize_type(261, SerializeType::JSON);
857        assert_eq!(i, 261);
858
859        let i = RemotingCommand::mark_serialize_type(16777215, SerializeType::JSON);
860        assert_eq!(i, 16777215);
861
862        println!("i={}", RemotingCommand::default().opaque);
863        println!("i={}", RemotingCommand::default().opaque);
864        println!("i={}", RemotingCommand::default().opaque);
865        println!("i={}", RemotingCommand::default().opaque);
866    }
867}