rocketmq_remoting/protocol/
remoting_command.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::fmt;
20use std::hint;
21use std::sync::atomic::AtomicI32;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::sync::Once;
25
26use bytes::Buf;
27use bytes::BufMut;
28use bytes::Bytes;
29use bytes::BytesMut;
30use cheetah_string::CheetahString;
31use lazy_static::lazy_static;
32use rocketmq_common::common::mq_version::RocketMqVersion;
33#[cfg(not(feature = "simd"))]
34use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
35use rocketmq_common::EnvUtils::EnvUtils;
36use rocketmq_rust::ArcMut;
37use serde::Deserialize;
38use serde::Serialize;
39use tracing::error;
40
41use super::RemotingCommandType;
42use super::SerializeType;
43use crate::code::request_code::RequestCode;
44use crate::code::response_code::RemotingSysResponseCode;
45use crate::protocol::command_custom_header::CommandCustomHeader;
46use crate::protocol::command_custom_header::FromMap;
47use crate::protocol::LanguageCode;
48use crate::rocketmq_serializable::RocketMQSerializable;
49
50pub const SERIALIZE_TYPE_PROPERTY: &str = "rocketmq.serialize.type";
51pub const SERIALIZE_TYPE_ENV: &str = "ROCKETMQ_SERIALIZE_TYPE";
52pub const REMOTING_VERSION_KEY: &str = "rocketmq.remoting.version";
53
54lazy_static! {
55    static ref requestId: Arc<AtomicI32> = Arc::new(AtomicI32::new(0));
56    static ref CONFIG_VERSION: i32 = {
57        EnvUtils::get_property(REMOTING_VERSION_KEY)
58            .unwrap_or(String::from("0"))
59            .parse::<i32>()
60            .unwrap_or(0)
61    };
62    static ref INIT: Once = Once::new();
63    pub static ref SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER: SerializeType = {
64        let protocol = std::env::var(SERIALIZE_TYPE_PROPERTY).unwrap_or_else(|_| {
65            std::env::var(SERIALIZE_TYPE_ENV).unwrap_or_else(|_| "".to_string())
66        });
67        match protocol.as_str() {
68            "JSON" => SerializeType::JSON,
69            "ROCKETMQ" => SerializeType::ROCKETMQ,
70            _ => SerializeType::JSON,
71        }
72    };
73}
74
75fn set_cmd_version(cmd: &mut RemotingCommand) {
76    cmd.set_version_ref(*CONFIG_VERSION);
77}
78
79#[derive(Serialize, Deserialize)]
80pub struct RemotingCommand {
81    code: i32,
82    language: LanguageCode,
83    version: i32,
84    opaque: i32,
85
86    ///flag -> bit: 00
87    /// The lowest bit of the flag indicates whether it is a response command.
88    /// Non-zero indicates a response command, while 0 indicates a request command.
89    /// The second bit indicates whether it is a one-way request.
90    /// Non-zero indicates a one-way request.
91    flag: i32,
92    remark: Option<CheetahString>,
93
94    #[serde(rename = "extFields")]
95    ext_fields: Option<HashMap<CheetahString, CheetahString>>,
96
97    #[serde(skip)]
98    body: Option<Bytes>,
99    #[serde(skip)]
100    suspended: bool,
101    #[serde(skip)]
102    command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
103    #[serde(rename = "serializeTypeCurrentRPC")]
104    serialize_type: SerializeType,
105}
106
107impl Clone for RemotingCommand {
108    fn clone(&self) -> Self {
109        Self {
110            code: self.code,
111            language: self.language,
112            version: self.version,
113            opaque: self.opaque,
114            flag: self.flag,
115            remark: self.remark.clone(),
116            ext_fields: self.ext_fields.clone(),
117            body: self.body.clone(),
118            suspended: self.suspended,
119            command_custom_header: self.command_custom_header.clone(),
120            serialize_type: self.serialize_type,
121        }
122    }
123}
124
125impl fmt::Display for RemotingCommand {
126    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127        write!(
128            f,
129            "RemotingCommand [code={}, language={}, version={}, opaque={}, flag(B)={:b}, \
130             remark={}, extFields={:?}, serializeTypeCurrentRPC={}]",
131            self.code,
132            self.language,
133            self.version,
134            self.opaque,
135            self.flag,
136            self.remark.as_ref().unwrap_or(&CheetahString::default()),
137            self.ext_fields,
138            self.serialize_type
139        )
140    }
141}
142
143impl Default for RemotingCommand {
144    fn default() -> Self {
145        let opaque = requestId.fetch_add(1, Ordering::AcqRel);
146        RemotingCommand {
147            code: 0,
148            language: LanguageCode::RUST, // Replace with your actual enum variant
149            version: 0,
150            opaque,
151            flag: 0,
152            remark: None,
153            ext_fields: None,
154            body: None,
155            suspended: false,
156            command_custom_header: None,
157            serialize_type: *SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER,
158        }
159    }
160}
161
162impl RemotingCommand {
163    pub(crate) const RPC_ONEWAY: i32 = 1;
164    pub(crate) const RPC_TYPE: i32 = 0;
165}
166
167impl RemotingCommand {
168    pub fn new_request(code: impl Into<i32>, body: impl Into<Bytes>) -> Self {
169        Self::default().set_code(code).set_body(body)
170    }
171
172    pub fn create_request_command<T>(code: impl Into<i32>, header: T) -> Self
173    where
174        T: CommandCustomHeader + Sync + Send + 'static,
175    {
176        let mut command = Self::default()
177            .set_code(code.into())
178            .set_command_custom_header(header);
179        set_cmd_version(&mut command);
180        command
181    }
182
183    pub fn create_remoting_command(code: impl Into<i32>) -> Self {
184        let command = Self::default();
185        command.set_code(code.into())
186    }
187
188    pub fn get_and_add() -> i32 {
189        requestId.fetch_add(1, Ordering::AcqRel)
190    }
191
192    pub fn create_response_command_with_code(code: impl Into<i32>) -> Self {
193        Self::default().set_code(code).mark_response_type()
194    }
195
196    pub fn create_response_command_with_code_remark(
197        code: impl Into<i32>,
198        remark: impl Into<CheetahString>,
199    ) -> Self {
200        Self::default()
201            .set_code(code)
202            .set_remark_option(Some(remark.into()))
203            .mark_response_type()
204    }
205
206    pub fn create_response_command() -> Self {
207        Self::default()
208            .set_code(RemotingSysResponseCode::Success)
209            .mark_response_type()
210    }
211
212    pub fn create_response_command_with_header(
213        header: impl CommandCustomHeader + Sync + Send + 'static,
214    ) -> Self {
215        Self::default()
216            .set_code(RemotingSysResponseCode::Success)
217            .set_command_custom_header(header)
218            .mark_response_type()
219    }
220
221    pub fn set_command_custom_header<T>(mut self, command_custom_header: T) -> Self
222    where
223        T: CommandCustomHeader + Sync + Send + 'static,
224    {
225        self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
226        self
227    }
228
229    pub fn set_command_custom_header_origin(
230        mut self,
231        command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
232    ) -> Self {
233        self.command_custom_header = command_custom_header;
234        self
235    }
236
237    pub fn set_command_custom_header_ref<T>(&mut self, command_custom_header: T)
238    where
239        T: CommandCustomHeader + Sync + Send + 'static,
240    {
241        self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
242    }
243
244    pub fn set_code(mut self, code: impl Into<i32>) -> Self {
245        self.code = code.into();
246        self
247    }
248
249    pub fn set_code_ref(&mut self, code: impl Into<i32>) {
250        self.code = code.into();
251    }
252
253    pub fn set_code_mut(&mut self, code: impl Into<i32>) -> &mut Self {
254        self.code = code.into();
255        self
256    }
257
258    pub fn set_language(mut self, language: LanguageCode) -> Self {
259        self.language = language;
260        self
261    }
262
263    pub fn set_version_ref(&mut self, version: i32) {
264        self.version = version;
265    }
266
267    pub fn set_version(mut self, version: i32) -> Self {
268        self.version = version;
269        self
270    }
271
272    #[inline]
273    pub fn set_opaque(mut self, opaque: i32) -> Self {
274        self.opaque = opaque;
275        self
276    }
277
278    #[inline]
279    pub fn set_opaque_mut(&mut self, opaque: i32) {
280        self.opaque = opaque;
281    }
282
283    #[inline]
284    pub fn set_flag(mut self, flag: i32) -> Self {
285        self.flag = flag;
286        self
287    }
288
289    #[inline]
290    pub fn set_remark_option(mut self, remark: Option<impl Into<CheetahString>>) -> Self {
291        self.remark = remark.map(|item| item.into());
292        self
293    }
294
295    #[inline]
296    pub fn set_remark(mut self, remark: impl Into<CheetahString>) -> Self {
297        self.remark = Some(remark.into());
298        self
299    }
300
301    #[inline]
302    pub fn set_remark_option_mut(&mut self, remark: Option<impl Into<CheetahString>>) {
303        self.remark = remark.map(|item| item.into());
304    }
305
306    #[inline]
307    pub fn set_remark_mut(&mut self, remark: impl Into<CheetahString>) {
308        self.remark = Some(remark.into());
309    }
310
311    #[inline]
312    pub fn set_ext_fields(mut self, ext_fields: HashMap<CheetahString, CheetahString>) -> Self {
313        self.ext_fields = Some(ext_fields);
314        self
315    }
316
317    #[inline]
318    pub fn set_body(mut self, body: impl Into<Bytes>) -> Self {
319        self.body = Some(body.into());
320        self
321    }
322
323    #[inline]
324    pub fn set_body_mut_ref(&mut self, body: impl Into<Bytes>) {
325        self.body = Some(body.into());
326    }
327
328    #[inline]
329    pub fn set_suspended(mut self, suspended: bool) -> Self {
330        self.suspended = suspended;
331        self
332    }
333
334    #[inline]
335    pub fn set_suspended_ref(&mut self, suspended: bool) {
336        self.suspended = suspended;
337    }
338
339    #[inline]
340    pub fn set_serialize_type(mut self, serialize_type: SerializeType) -> Self {
341        self.serialize_type = serialize_type;
342        self
343    }
344
345    #[inline]
346    pub fn mark_response_type(mut self) -> Self {
347        let mark = 1 << Self::RPC_TYPE;
348        self.flag |= mark;
349        self
350    }
351
352    #[inline]
353    pub fn mark_response_type_ref(&mut self) {
354        let mark = 1 << Self::RPC_TYPE;
355        self.flag |= mark;
356    }
357
358    #[inline]
359    pub fn mark_oneway_rpc(mut self) -> Self {
360        let mark = 1 << Self::RPC_ONEWAY;
361        self.flag |= mark;
362        self
363    }
364
365    #[inline]
366    pub fn mark_oneway_rpc_ref(&mut self) {
367        let mark = 1 << Self::RPC_ONEWAY;
368        self.flag |= mark;
369    }
370
371    #[inline]
372    pub fn get_serialize_type(&self) -> SerializeType {
373        self.serialize_type
374    }
375
376    /// Encode header with optimized path selection
377    #[inline]
378    pub fn header_encode(&mut self) -> Option<Bytes> {
379        self.make_custom_header_to_net();
380        match self.serialize_type {
381            SerializeType::ROCKETMQ => {
382                Some(RocketMQSerializable::rocket_mq_protocol_encode_bytes(self))
383            }
384            SerializeType::JSON => {
385                #[cfg(feature = "simd")]
386                {
387                    match simd_json::to_vec(self) {
388                        Ok(value) => Some(Bytes::from(value)),
389                        Err(e) => {
390                            error!("Failed to encode JSON header with simd-json: {}", e);
391                            None
392                        }
393                    }
394                }
395                #[cfg(not(feature = "simd"))]
396                {
397                    match serde_json::to_vec(self) {
398                        Ok(value) => Some(Bytes::from(value)),
399                        Err(e) => {
400                            error!("Failed to encode JSON header: {}", e);
401                            None
402                        }
403                    }
404                }
405            }
406        }
407    }
408
409    /// Encode header with body length information
410    #[inline]
411    pub fn encode_header(&mut self) -> Option<Bytes> {
412        let body_length = self.body.as_ref().map_or(0, |b| b.len());
413        self.encode_header_with_body_length(body_length)
414    }
415
416    /// Optimized header encoding with pre-calculated capacity
417    #[inline]
418    pub fn encode_header_with_body_length(&mut self, body_length: usize) -> Option<Bytes> {
419        // Encode header data
420        let header_data = self.header_encode()?;
421        let header_len = header_data.len();
422
423        // Calculate frame size: 4 (total_length) + 4 (serialize_type) + header_len
424        let frame_header_size = 8;
425        let total_length = 4 + header_len + body_length; // 4 is for serialize_type field
426
427        // Allocate exact capacity
428        let mut result = BytesMut::with_capacity(frame_header_size + header_len);
429
430        // Write total length
431        result.put_i32(total_length as i32);
432
433        // Write serialize type with embedded header length
434        result.put_i32(mark_protocol_type(header_len as i32, self.serialize_type));
435
436        // Write header data
437        result.put(header_data);
438
439        Some(result.freeze())
440    }
441
442    /// Convert custom header to network format (merge into ext_fields)
443    #[inline]
444    pub fn make_custom_header_to_net(&mut self) {
445        if let Some(header) = &self.command_custom_header {
446            if let Some(header_map) = header.to_map() {
447                match &mut self.ext_fields {
448                    None => {
449                        self.ext_fields = Some(header_map);
450                    }
451                    Some(ext) => {
452                        // Merge header map into existing ext_fields
453                        for (key, value) in header_map {
454                            ext.insert(key, value);
455                        }
456                    }
457                }
458            }
459        }
460    }
461
462    #[inline]
463    pub fn fast_header_encode(&mut self, dst: &mut BytesMut) {
464        match self.serialize_type {
465            SerializeType::JSON => {
466                self.fast_encode_json(dst);
467            }
468            SerializeType::ROCKETMQ => {
469                self.fast_encode_rocketmq(dst);
470            }
471        }
472    }
473
474    /// Optimized JSON encoding with pre-calculated capacity and zero-copy optimizations
475    #[inline]
476    fn fast_encode_json(&mut self, dst: &mut BytesMut) {
477        self.make_custom_header_to_net();
478
479        // Pre-calculate approximate header size to reduce reallocations
480        let estimated_header_size = self.estimate_json_header_size();
481        let body_length = self.body.as_ref().map_or(0, |b| b.len());
482
483        // Reserve space upfront: 4 (total_length) + 4 (serialize_type) + estimated_header + body
484        dst.reserve(8 + estimated_header_size + body_length);
485
486        // Encode header using simd-json for better performance when available
487        #[cfg(feature = "simd")]
488        let encode_result = simd_json::to_vec(self);
489
490        #[cfg(not(feature = "simd"))]
491        let encode_result = serde_json::to_vec(self);
492
493        match encode_result {
494            Ok(header_bytes) => {
495                let header_length = header_bytes.len() as i32;
496                let body_length = body_length as i32;
497                let total_length = 4 + header_length + body_length;
498
499                // Write frame header
500                dst.put_i32(total_length);
501                dst.put_i32(RemotingCommand::mark_serialize_type(
502                    header_length,
503                    SerializeType::JSON,
504                ));
505
506                // Write header bytes (zero-copy from Vec)
507                dst.put_slice(&header_bytes);
508            }
509            Err(e) => {
510                error!("Failed to encode JSON header: {}", e);
511                // Write minimal error frame
512                dst.put_i32(4); // total_length: just the serialize_type field
513                dst.put_i32(RemotingCommand::mark_serialize_type(0, SerializeType::JSON));
514            }
515        }
516    }
517
518    /// Optimized ROCKETMQ binary encoding with minimal allocations
519    #[inline]
520    fn fast_encode_rocketmq(&mut self, dst: &mut BytesMut) {
521        let begin_index = dst.len();
522
523        // Reserve space for total_length (4 bytes) and serialize_type (4 bytes)
524        dst.reserve(8);
525        dst.put_i64(0); // Placeholder for total_length + serialize_type
526
527        // Check if custom header supports fast codec
528        if let Some(header) = self.command_custom_header_ref() {
529            if !header.support_fast_codec() {
530                self.make_custom_header_to_net();
531            }
532        }
533
534        // Encode header directly to buffer
535        let header_size = RocketMQSerializable::rocketmq_protocol_encode(self, dst);
536        let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
537
538        // Calculate serialize type with header length embedded
539        let serialize_type =
540            RemotingCommand::mark_serialize_type(header_size as i32, SerializeType::ROCKETMQ);
541
542        // Write total_length and serialize_type at the beginning (in-place update)
543        let total_length = (header_size as i32 + body_length).to_be_bytes();
544        let serialize_type_bytes = serialize_type.to_be_bytes();
545
546        dst[begin_index..begin_index + 4].copy_from_slice(&total_length);
547        dst[begin_index + 4..begin_index + 8].copy_from_slice(&serialize_type_bytes);
548    }
549
550    /// Estimate JSON header size to reduce buffer reallocations
551    /// This is an approximation based on typical field sizes
552    #[inline]
553    fn estimate_json_header_size(&self) -> usize {
554        let mut size = 100; // Base JSON overhead
555
556        if let Some(ref remark) = self.remark {
557            size += remark.len() + 20; // "remark":"..." + quotes
558        }
559
560        if let Some(ref ext) = self.ext_fields {
561            // Approximate: each entry adds ~30 bytes overhead + key/value lengths
562            size += ext
563                .iter()
564                .map(|(k, v)| k.len() + v.len() + 30)
565                .sum::<usize>();
566        }
567
568        size
569    }
570
571    /// Optimized decode with enhanced boundary checks and reduced allocations
572    #[inline]
573    pub fn decode(src: &mut BytesMut) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
574        const FRAME_HEADER_SIZE: usize = 4;
575        const SERIALIZE_TYPE_SIZE: usize = 4;
576        const MIN_PAYLOAD_SIZE: usize = SERIALIZE_TYPE_SIZE; // Minimum: just serialize_type field
577
578        let available = src.len();
579
580        // Early return if not enough data for frame header
581        if available < FRAME_HEADER_SIZE {
582            return Ok(None);
583        }
584
585        // Read total size without advancing the buffer (peek)
586        let total_size = i32::from_be_bytes([src[0], src[1], src[2], src[3]]) as usize;
587
588        // Validate total_size to prevent overflow attacks (check max first)
589        if total_size > 16 * 1024 * 1024 {
590            return Err(rocketmq_error::RocketMQError::Serialization(
591                rocketmq_error::SerializationError::DecodeFailed {
592                    format: "remoting_command",
593                    message: format!("Frame size {total_size} exceeds maximum allowed (16MB)"),
594                },
595            ));
596        }
597
598        // Wait for complete frame
599        let full_frame_size = total_size + FRAME_HEADER_SIZE;
600        if available < full_frame_size {
601            return Ok(None);
602        }
603
604        // Now validate minimum total_size (we have the complete frame)
605        if total_size < MIN_PAYLOAD_SIZE {
606            return Err(rocketmq_error::RocketMQError::Serialization(
607                rocketmq_error::SerializationError::DecodeFailed {
608                    format: "remoting_command",
609                    message: format!(
610                        "Invalid total_size {total_size}, minimum required is {MIN_PAYLOAD_SIZE}"
611                    ),
612                },
613            ));
614        }
615
616        // Extract complete frame (zero-copy split)
617        let mut cmd_data = src.split_to(full_frame_size);
618        cmd_data.advance(FRAME_HEADER_SIZE); // Skip total_size field
619
620        // Ensure we have serialize_type field (should always pass after above checks)
621        if cmd_data.remaining() < SERIALIZE_TYPE_SIZE {
622            return Err(rocketmq_error::RocketMQError::Serialization(
623                rocketmq_error::SerializationError::DecodeFailed {
624                    format: "remoting_command",
625                    message: "Incomplete serialize_type field".to_string(),
626                },
627            ));
628        }
629
630        // Parse header length and protocol type
631        let ori_header_length = cmd_data.get_i32();
632        let header_length = parse_header_length(ori_header_length);
633
634        // Validate header length
635        if header_length > total_size - SERIALIZE_TYPE_SIZE {
636            return Err(rocketmq_error::RocketMQError::Serialization(
637                rocketmq_error::SerializationError::DecodeFailed {
638                    format: "remoting_command",
639                    message: format!(
640                        "Invalid header length {header_length}, total size {total_size}"
641                    ),
642                },
643            ));
644        }
645
646        let protocol_type = parse_serialize_type(ori_header_length)?;
647
648        // Split header and body (zero-copy)
649        let mut header_data = cmd_data.split_to(header_length);
650
651        // Decode header
652        let mut cmd =
653            RemotingCommand::header_decode(&mut header_data, header_length, protocol_type)?;
654
655        // Attach body if present (zero-copy freeze)
656        if let Some(ref mut cmd) = cmd {
657            let body_length = total_size - SERIALIZE_TYPE_SIZE - header_length;
658            if body_length > 0 {
659                if cmd_data.remaining() >= body_length {
660                    cmd.set_body_mut_ref(cmd_data.split_to(body_length).freeze());
661                } else {
662                    return Err(rocketmq_error::RocketMQError::Serialization(
663                        rocketmq_error::SerializationError::DecodeFailed {
664                            format: "remoting_command",
665                            message: format!(
666                                "Insufficient body data: expected {body_length}, available {}",
667                                cmd_data.remaining()
668                            ),
669                        },
670                    ));
671                }
672            }
673        }
674
675        Ok(cmd)
676    }
677
678    /// Optimized header decoding with type-based dispatch
679    #[inline]
680    pub fn header_decode(
681        src: &mut BytesMut,
682        header_length: usize,
683        type_: SerializeType,
684    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
685        match type_ {
686            SerializeType::JSON => {
687                // Deserialize JSON header using simd-json when available
688                #[cfg(feature = "simd")]
689                let cmd = {
690                    let mut slice = src.split_to(header_length).to_vec();
691                    simd_json::from_slice::<RemotingCommand>(&mut slice).map_err(|error| {
692                        rocketmq_error::RocketMQError::Serialization(
693                            rocketmq_error::SerializationError::DecodeFailed {
694                                format: "json",
695                                message: format!("SIMD JSON deserialization error: {error}"),
696                            },
697                        )
698                    })?
699                };
700
701                #[cfg(not(feature = "simd"))]
702                let cmd =
703                    SerdeJsonUtils::from_json_slice::<RemotingCommand>(src).map_err(|error| {
704                        rocketmq_error::RocketMQError::Serialization(
705                            rocketmq_error::SerializationError::DecodeFailed {
706                                format: "json",
707                                message: format!("JSON deserialization error: {error}"),
708                            },
709                        )
710                    })?;
711
712                Ok(Some(cmd.set_serialize_type(SerializeType::JSON)))
713            }
714            SerializeType::ROCKETMQ => {
715                // Deserialize binary header
716                let cmd = RocketMQSerializable::rocket_mq_protocol_decode(src, header_length)?;
717                Ok(Some(cmd.set_serialize_type(SerializeType::ROCKETMQ)))
718            }
719        }
720    }
721
722    #[inline]
723    pub fn get_body(&self) -> Option<&Bytes> {
724        self.body.as_ref()
725    }
726
727    #[inline]
728    pub fn get_body_mut(&mut self) -> Option<&mut Bytes> {
729        self.body.as_mut()
730    }
731
732    #[inline]
733    pub fn mark_serialize_type(header_length: i32, protocol_type: SerializeType) -> i32 {
734        ((protocol_type.get_code() as i32) << 24) | (header_length & 0x00FFFFFF)
735    }
736
737    #[inline]
738    pub fn code(&self) -> i32 {
739        self.code
740    }
741
742    #[inline]
743    pub fn request_code(&self) -> RequestCode {
744        RequestCode::from(self.code)
745    }
746
747    #[inline]
748    pub fn code_ref(&self) -> &i32 {
749        &self.code
750    }
751
752    #[inline]
753    pub fn language(&self) -> LanguageCode {
754        self.language
755    }
756
757    #[inline]
758    pub fn version(&self) -> i32 {
759        self.version
760    }
761
762    pub fn rocketmq_version(&self) -> RocketMqVersion {
763        RocketMqVersion::from_ordinal(self.version as u32)
764    }
765
766    #[inline]
767    pub fn opaque(&self) -> i32 {
768        self.opaque
769    }
770
771    #[inline]
772    pub fn flag(&self) -> i32 {
773        self.flag
774    }
775
776    #[inline]
777    pub fn remark(&self) -> Option<&CheetahString> {
778        self.remark.as_ref()
779    }
780
781    #[inline]
782    pub fn ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
783        self.ext_fields.as_ref()
784    }
785
786    #[inline]
787    pub fn body(&self) -> Option<&Bytes> {
788        self.body.as_ref()
789    }
790
791    #[inline]
792    pub fn take_body(&mut self) -> Option<Bytes> {
793        self.body.take()
794    }
795
796    #[inline]
797    pub fn suspended(&self) -> bool {
798        self.suspended
799    }
800
801    #[inline]
802    pub fn serialize_type(&self) -> SerializeType {
803        self.serialize_type
804    }
805
806    pub fn decode_command_custom_header<T>(&self) -> rocketmq_error::RocketMQResult<T>
807    where
808        T: FromMap<Target = T, Error = rocketmq_error::RocketMQError>,
809    {
810        match self.ext_fields {
811            None => Err(rocketmq_error::RocketMQError::Serialization(
812                rocketmq_error::SerializationError::DecodeFailed {
813                    format: "header",
814                    message: "ExtFields is None".to_string(),
815                },
816            )),
817            Some(ref header) => T::from(header),
818        }
819    }
820
821    pub fn decode_command_custom_header_fast<T>(&self) -> rocketmq_error::RocketMQResult<T>
822    where
823        T: FromMap<Target = T, Error = rocketmq_error::RocketMQError>,
824        T: Default + CommandCustomHeader,
825    {
826        match self.ext_fields {
827            None => Err(rocketmq_error::RocketMQError::Serialization(
828                rocketmq_error::SerializationError::DecodeFailed {
829                    format: "header",
830                    message: "ExtFields is None".to_string(),
831                },
832            )),
833            Some(ref header) => {
834                let mut target = T::default();
835                if target.support_fast_codec() {
836                    target.decode_fast(header)?;
837                    Ok(target)
838                } else {
839                    T::from(header)
840                }
841            }
842        }
843    }
844
845    #[inline]
846    pub fn is_response_type(&self) -> bool {
847        let bits = 1 << Self::RPC_TYPE;
848        (self.flag & bits) == bits
849    }
850
851    #[inline]
852    pub fn is_oneway_rpc(&self) -> bool {
853        let bits = 1 << Self::RPC_ONEWAY;
854        (self.flag & bits) == bits
855    }
856
857    pub fn get_type(&self) -> RemotingCommandType {
858        if self.is_response_type() {
859            RemotingCommandType::RESPONSE
860        } else {
861            RemotingCommandType::REQUEST
862        }
863    }
864
865    #[inline]
866    pub fn with_opaque(&mut self, opaque: i32) -> &mut Self {
867        self.opaque = opaque;
868        self
869    }
870
871    pub fn add_ext_field(
872        &mut self,
873        key: impl Into<CheetahString>,
874        value: impl Into<CheetahString>,
875    ) -> &mut Self {
876        if let Some(ref mut ext) = self.ext_fields {
877            ext.insert(key.into(), value.into());
878        }
879        self
880    }
881
882    #[inline]
883    pub fn with_code(&mut self, code: impl Into<i32>) -> &mut Self {
884        self.code = code.into();
885        self
886    }
887
888    #[inline]
889    pub fn with_remark(&mut self, remark: impl Into<CheetahString>) -> &mut Self {
890        self.remark = Some(remark.into());
891        self
892    }
893
894    #[inline]
895    pub fn get_ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
896        self.ext_fields.as_ref()
897    }
898
899    pub fn read_custom_header_ref<T>(&self) -> Option<&T>
900    where
901        T: CommandCustomHeader + Sync + Send + 'static,
902    {
903        match self.command_custom_header.as_ref() {
904            None => None,
905            Some(value) => value.as_ref().as_any().downcast_ref::<T>(),
906        }
907    }
908
909    pub fn read_custom_header_ref_unchecked<T>(&self) -> &T
910    where
911        T: CommandCustomHeader + Sync + Send + 'static,
912    {
913        match self.command_custom_header.as_ref() {
914            None => unsafe { hint::unreachable_unchecked() },
915            Some(value) => value.as_ref().as_any().downcast_ref::<T>().unwrap(),
916        }
917    }
918
919    pub fn read_custom_header_mut<T>(&mut self) -> Option<&mut T>
920    where
921        T: CommandCustomHeader + Sync + Send + 'static,
922    {
923        match self.command_custom_header.as_mut() {
924            None => None,
925            Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>(),
926        }
927    }
928
929    pub fn read_custom_header_mut_from_ref<T>(&self) -> Option<&mut T>
930    where
931        T: CommandCustomHeader + Sync + Send + 'static,
932    {
933        match self.command_custom_header.as_ref() {
934            None => None,
935            Some(value) => value.mut_from_ref().as_any_mut().downcast_mut::<T>(),
936        }
937    }
938
939    pub fn read_custom_header_mut_unchecked<T>(&mut self) -> &mut T
940    where
941        T: CommandCustomHeader + Sync + Send + 'static,
942    {
943        match self.command_custom_header.as_mut() {
944            None => unsafe { hint::unreachable_unchecked() },
945            Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>().unwrap(),
946        }
947    }
948
949    pub fn command_custom_header_ref(&self) -> Option<&dyn CommandCustomHeader> {
950        match self.command_custom_header.as_ref() {
951            None => None,
952            Some(value) => Some(value.as_ref().as_ref()),
953        }
954    }
955
956    pub fn command_custom_header_mut(&mut self) -> Option<&mut dyn CommandCustomHeader> {
957        match self.command_custom_header.as_mut() {
958            None => None,
959            Some(value) => Some(value.as_mut().as_mut()),
960        }
961    }
962
963    pub fn create_new_request_id() -> i32 {
964        requestId.fetch_add(1, Ordering::AcqRel)
965    }
966
967    #[inline]
968    pub fn add_ext_field_if_not_exist(
969        &mut self,
970        key: impl Into<CheetahString>,
971        value: impl Into<CheetahString>,
972    ) {
973        if let Some(ref mut ext) = self.ext_fields {
974            ext.entry(key.into()).or_insert(value.into());
975        }
976    }
977}
978
979/// Extract header length from the combined serialize_type field
980#[inline]
981pub fn parse_header_length(size: i32) -> usize {
982    (size & 0x00FFFFFF) as usize
983}
984
985/// Combine serialize type code with header length
986#[inline]
987pub fn mark_protocol_type(source: i32, serialize_type: SerializeType) -> i32 {
988    ((serialize_type.get_code() as i32) << 24) | (source & 0x00FFFFFF)
989}
990
991/// Extract serialize type from the combined field
992#[inline]
993pub fn parse_serialize_type(size: i32) -> rocketmq_error::RocketMQResult<SerializeType> {
994    let code = (size >> 24) as u8;
995    SerializeType::value_of(code).ok_or({
996        rocketmq_error::RocketMQError::Protocol(
997            rocketmq_error::ProtocolError::UnsupportedSerializationType {
998                serialize_type: code,
999            },
1000        )
1001    })
1002}
1003
1004impl AsRef<RemotingCommand> for RemotingCommand {
1005    #[inline]
1006    fn as_ref(&self) -> &RemotingCommand {
1007        self
1008    }
1009}
1010
1011impl AsMut<RemotingCommand> for RemotingCommand {
1012    #[inline]
1013    fn as_mut(&mut self) -> &mut RemotingCommand {
1014        self
1015    }
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020    use super::*;
1021
1022    #[test]
1023    fn test_remoting_command() {
1024        let command = RemotingCommand::create_remoting_command(1)
1025            .set_code(1)
1026            .set_language(LanguageCode::JAVA)
1027            .set_opaque(1)
1028            .set_flag(1)
1029            .set_ext_fields(HashMap::new())
1030            .set_remark_option(Some("remark".to_string()));
1031
1032        assert_eq!(
1033            "{\"code\":1,\"language\":\"JAVA\",\"version\":0,\"opaque\":1,\"flag\":1,\"remark\":\"\
1034             remark\",\"extFields\":{},\"serializeTypeCurrentRPC\":\"JSON\"}",
1035            serde_json::to_string(&command).unwrap()
1036        );
1037    }
1038
1039    #[test]
1040    fn test_mark_serialize_type() {
1041        let i = RemotingCommand::mark_serialize_type(261, SerializeType::JSON);
1042        assert_eq!(i, 261);
1043
1044        let i = RemotingCommand::mark_serialize_type(16777215, SerializeType::JSON);
1045        assert_eq!(i, 16777215);
1046
1047        println!("i={}", RemotingCommand::default().opaque);
1048        println!("i={}", RemotingCommand::default().opaque);
1049        println!("i={}", RemotingCommand::default().opaque);
1050        println!("i={}", RemotingCommand::default().opaque);
1051    }
1052}