Skip to main content

rocketmq_remoting/protocol/
remoting_command.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::hint;
18use std::sync::atomic::AtomicI32;
19use std::sync::atomic::Ordering;
20use std::sync::Arc;
21
22use bytes::Buf;
23use bytes::BufMut;
24use bytes::Bytes;
25use bytes::BytesMut;
26use cheetah_string::CheetahString;
27use rocketmq_common::common::mq_version::RocketMqVersion;
28use rocketmq_common::common::mq_version::CURRENT_VERSION;
29#[cfg(not(feature = "simd"))]
30use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
31use rocketmq_common::EnvUtils::EnvUtils;
32use rocketmq_rust::ArcMut;
33use serde::Deserialize;
34use serde::Serialize;
35use tracing::error;
36
37use super::RemotingCommandType;
38use super::SerializeType;
39use crate::code::request_code::RequestCode;
40use crate::code::response_code::RemotingSysResponseCode;
41use crate::protocol::command_custom_header::CommandCustomHeader;
42use crate::protocol::command_custom_header::FromMap;
43use crate::protocol::LanguageCode;
44use crate::rocketmq_serializable::RocketMQSerializable;
45
46pub const SERIALIZE_TYPE_PROPERTY: &str = "rocketmq.serialize.type";
47pub const SERIALIZE_TYPE_ENV: &str = "ROCKETMQ_SERIALIZE_TYPE";
48pub const REMOTING_VERSION_KEY: &str = "rocketmq.remoting.version";
49
50static REQUEST_ID: std::sync::LazyLock<Arc<AtomicI32>> = std::sync::LazyLock::new(|| Arc::new(AtomicI32::new(0)));
51
52static CONFIG_VERSION: std::sync::LazyLock<i32> = std::sync::LazyLock::new(|| {
53    EnvUtils::get_property(REMOTING_VERSION_KEY)
54        .unwrap_or_else(|| (CURRENT_VERSION as i32).to_string())
55        .parse::<i32>()
56        .unwrap_or(CURRENT_VERSION as i32)
57});
58
59pub static SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER: std::sync::LazyLock<SerializeType> = std::sync::LazyLock::new(|| {
60    let protocol = std::env::var(SERIALIZE_TYPE_PROPERTY)
61        .unwrap_or_else(|_| std::env::var(SERIALIZE_TYPE_ENV).unwrap_or_else(|_| "".to_string()));
62    match protocol.as_str() {
63        "JSON" => SerializeType::JSON,
64        "ROCKETMQ" => SerializeType::ROCKETMQ,
65        _ => SerializeType::JSON,
66    }
67});
68
69fn set_cmd_version(cmd: &mut RemotingCommand) {
70    cmd.set_version_ref(*CONFIG_VERSION);
71}
72
73#[derive(Serialize, Deserialize)]
74pub struct RemotingCommand {
75    code: i32,
76    language: LanguageCode,
77    version: i32,
78    opaque: i32,
79
80    ///flag -> bit: 00
81    /// The lowest bit of the flag indicates whether it is a response command.
82    /// Non-zero indicates a response command, while 0 indicates a request command.
83    /// The second bit indicates whether it is a one-way request.
84    /// Non-zero indicates a one-way request.
85    flag: i32,
86    remark: Option<CheetahString>,
87
88    #[serde(rename = "extFields")]
89    ext_fields: Option<HashMap<CheetahString, CheetahString>>,
90
91    #[serde(skip)]
92    body: Option<Bytes>,
93    #[serde(skip)]
94    suspended: bool,
95    #[serde(skip)]
96    command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
97    #[serde(rename = "serializeTypeCurrentRPC")]
98    serialize_type: SerializeType,
99}
100
101impl Clone for RemotingCommand {
102    fn clone(&self) -> Self {
103        Self {
104            code: self.code,
105            language: self.language,
106            version: self.version,
107            opaque: self.opaque,
108            flag: self.flag,
109            remark: self.remark.clone(),
110            ext_fields: self.ext_fields.clone(),
111            body: self.body.clone(),
112            suspended: self.suspended,
113            command_custom_header: self.command_custom_header.clone(),
114            serialize_type: self.serialize_type,
115        }
116    }
117}
118
119impl fmt::Display for RemotingCommand {
120    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121        write!(
122            f,
123            "RemotingCommand [code={}, language={}, version={}, opaque={}, flag(B)={:b}, remark={}, extFields={:?}, \
124             serializeTypeCurrentRPC={}]",
125            self.code,
126            self.language,
127            self.version,
128            self.opaque,
129            self.flag,
130            self.remark.as_ref().unwrap_or(&CheetahString::default()),
131            self.ext_fields,
132            self.serialize_type
133        )
134    }
135}
136
137impl Default for RemotingCommand {
138    fn default() -> Self {
139        let opaque = REQUEST_ID.fetch_add(1, Ordering::AcqRel);
140        RemotingCommand {
141            code: 0,
142            language: LanguageCode::RUST, // Replace with your actual enum variant
143            version: 0,
144            opaque,
145            flag: 0,
146            remark: None,
147            ext_fields: None,
148            body: None,
149            suspended: false,
150            command_custom_header: None,
151            serialize_type: *SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER,
152        }
153    }
154}
155
156impl RemotingCommand {
157    pub(crate) const RPC_ONEWAY: i32 = 1;
158    pub(crate) const RPC_TYPE: i32 = 0;
159}
160
161impl RemotingCommand {
162    pub fn new_request(code: impl Into<i32>, body: impl Into<Bytes>) -> Self {
163        Self::default().set_code(code).set_body(body)
164    }
165
166    pub fn create_request_command<T>(code: impl Into<i32>, header: T) -> Self
167    where
168        T: CommandCustomHeader + Sync + Send + 'static,
169    {
170        let mut command = Self::default().set_code(code.into()).set_command_custom_header(header);
171        set_cmd_version(&mut command);
172        command
173    }
174
175    pub fn create_remoting_command(code: impl Into<i32>) -> Self {
176        let command = Self::default();
177        command.set_code(code.into())
178    }
179
180    pub fn get_and_add() -> i32 {
181        REQUEST_ID.fetch_add(1, Ordering::AcqRel)
182    }
183
184    pub fn create_response_command_with_code(code: impl Into<i32>) -> Self {
185        Self::default().set_code(code).mark_response_type()
186    }
187
188    pub fn create_response_command_with_code_remark(code: impl Into<i32>, remark: impl Into<CheetahString>) -> Self {
189        Self::default()
190            .set_code(code)
191            .set_remark_option(Some(remark.into()))
192            .mark_response_type()
193    }
194
195    pub fn create_response_command() -> Self {
196        Self::default()
197            .set_code(RemotingSysResponseCode::Success)
198            .mark_response_type()
199    }
200
201    pub fn create_response_command_with_header(header: impl CommandCustomHeader + Sync + Send + 'static) -> Self {
202        Self::default()
203            .set_code(RemotingSysResponseCode::Success)
204            .set_command_custom_header(header)
205            .mark_response_type()
206    }
207
208    pub fn set_command_custom_header<T>(mut self, command_custom_header: T) -> Self
209    where
210        T: CommandCustomHeader + Sync + Send + 'static,
211    {
212        self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
213        self
214    }
215
216    pub fn set_command_custom_header_origin(
217        mut self,
218        command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
219    ) -> Self {
220        self.command_custom_header = command_custom_header;
221        self
222    }
223
224    pub fn set_command_custom_header_ref<T>(&mut self, command_custom_header: T)
225    where
226        T: CommandCustomHeader + Sync + Send + 'static,
227    {
228        self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
229    }
230
231    pub fn set_code(mut self, code: impl Into<i32>) -> Self {
232        self.code = code.into();
233        self
234    }
235
236    pub fn set_code_ref(&mut self, code: impl Into<i32>) {
237        self.code = code.into();
238    }
239
240    pub fn set_code_mut(&mut self, code: impl Into<i32>) -> &mut Self {
241        self.code = code.into();
242        self
243    }
244
245    pub fn set_language(mut self, language: LanguageCode) -> Self {
246        self.language = language;
247        self
248    }
249
250    pub fn set_version_ref(&mut self, version: i32) {
251        self.version = version;
252    }
253
254    pub fn set_version(mut self, version: i32) -> Self {
255        self.version = version;
256        self
257    }
258
259    #[inline]
260    pub fn set_opaque(mut self, opaque: i32) -> Self {
261        self.opaque = opaque;
262        self
263    }
264
265    #[inline]
266    pub fn set_opaque_mut(&mut self, opaque: i32) {
267        self.opaque = opaque;
268    }
269
270    #[inline]
271    pub fn set_flag(mut self, flag: i32) -> Self {
272        self.flag = flag;
273        self
274    }
275
276    #[inline]
277    pub fn set_remark_option(mut self, remark: Option<impl Into<CheetahString>>) -> Self {
278        self.remark = remark.map(|item| item.into());
279        self
280    }
281
282    #[inline]
283    pub fn set_remark(mut self, remark: impl Into<CheetahString>) -> Self {
284        self.remark = Some(remark.into());
285        self
286    }
287
288    #[inline]
289    pub fn set_remark_option_mut(&mut self, remark: Option<impl Into<CheetahString>>) {
290        self.remark = remark.map(|item| item.into());
291    }
292
293    #[inline]
294    pub fn set_remark_mut(&mut self, remark: impl Into<CheetahString>) {
295        self.remark = Some(remark.into());
296    }
297
298    #[inline]
299    pub fn set_ext_fields(mut self, ext_fields: HashMap<CheetahString, CheetahString>) -> Self {
300        self.ext_fields = Some(ext_fields);
301        self
302    }
303
304    #[inline]
305    pub fn set_body(mut self, body: impl Into<Bytes>) -> Self {
306        self.body = Some(body.into());
307        self
308    }
309
310    #[inline]
311    pub fn set_body_mut_ref(&mut self, body: impl Into<Bytes>) {
312        self.body = Some(body.into());
313    }
314
315    #[inline]
316    pub fn set_suspended(mut self, suspended: bool) -> Self {
317        self.suspended = suspended;
318        self
319    }
320
321    #[inline]
322    pub fn set_suspended_ref(&mut self, suspended: bool) {
323        self.suspended = suspended;
324    }
325
326    #[inline]
327    pub fn set_serialize_type(mut self, serialize_type: SerializeType) -> Self {
328        self.serialize_type = serialize_type;
329        self
330    }
331
332    #[inline]
333    pub fn mark_response_type(mut self) -> Self {
334        let mark = 1 << Self::RPC_TYPE;
335        self.flag |= mark;
336        self
337    }
338
339    #[inline]
340    pub fn mark_response_type_ref(&mut self) {
341        let mark = 1 << Self::RPC_TYPE;
342        self.flag |= mark;
343    }
344
345    #[inline]
346    pub fn mark_oneway_rpc(mut self) -> Self {
347        let mark = 1 << Self::RPC_ONEWAY;
348        self.flag |= mark;
349        self
350    }
351
352    #[inline]
353    pub fn mark_oneway_rpc_ref(&mut self) {
354        let mark = 1 << Self::RPC_ONEWAY;
355        self.flag |= mark;
356    }
357
358    #[inline]
359    pub fn get_serialize_type(&self) -> SerializeType {
360        self.serialize_type
361    }
362
363    /// Encode header with optimized path selection
364    #[inline]
365    pub fn header_encode(&mut self) -> Option<Bytes> {
366        self.make_custom_header_to_net();
367        match self.serialize_type {
368            SerializeType::ROCKETMQ => Some(RocketMQSerializable::rocket_mq_protocol_encode_bytes(self)),
369            SerializeType::JSON => {
370                #[cfg(feature = "simd")]
371                {
372                    match simd_json::to_vec(self) {
373                        Ok(value) => Some(Bytes::from(value)),
374                        Err(e) => {
375                            error!("Failed to encode JSON header with simd-json: {}", e);
376                            None
377                        }
378                    }
379                }
380                #[cfg(not(feature = "simd"))]
381                {
382                    match serde_json::to_vec(self) {
383                        Ok(value) => Some(Bytes::from(value)),
384                        Err(e) => {
385                            error!("Failed to encode JSON header: {}", e);
386                            None
387                        }
388                    }
389                }
390            }
391        }
392    }
393
394    /// Encode header with body length information
395    #[inline]
396    pub fn encode_header(&mut self) -> Option<Bytes> {
397        let body_length = self.body.as_ref().map_or(0, |b| b.len());
398        self.encode_header_with_body_length(body_length)
399    }
400
401    /// Optimized header encoding with pre-calculated capacity
402    #[inline]
403    pub fn encode_header_with_body_length(&mut self, body_length: usize) -> Option<Bytes> {
404        // Encode header data
405        let header_data = self.header_encode()?;
406        let header_len = header_data.len();
407
408        // Calculate frame size: 4 (total_length) + 4 (serialize_type) + header_len
409        let frame_header_size = 8;
410        let total_length = 4 + header_len + body_length; // 4 is for serialize_type field
411
412        // Allocate exact capacity
413        let mut result = BytesMut::with_capacity(frame_header_size + header_len);
414
415        // Write total length
416        result.put_i32(total_length as i32);
417
418        // Write serialize type with embedded header length
419        result.put_i32(mark_protocol_type(header_len as i32, self.serialize_type));
420
421        // Write header data
422        result.put(header_data);
423
424        Some(result.freeze())
425    }
426
427    /// Convert custom header to network format (merge into ext_fields)
428    #[inline]
429    pub fn make_custom_header_to_net(&mut self) {
430        if let Some(header) = &self.command_custom_header {
431            if let Some(header_map) = header.to_map() {
432                match &mut self.ext_fields {
433                    None => {
434                        self.ext_fields = Some(header_map);
435                    }
436                    Some(ext) => {
437                        // Merge header map into existing ext_fields
438                        for (key, value) in header_map {
439                            ext.insert(key, value);
440                        }
441                    }
442                }
443            }
444        }
445    }
446
447    #[inline]
448    pub fn fast_header_encode(&mut self, dst: &mut BytesMut) {
449        match self.serialize_type {
450            SerializeType::JSON => {
451                self.fast_encode_json(dst);
452            }
453            SerializeType::ROCKETMQ => {
454                self.fast_encode_rocketmq(dst);
455            }
456        }
457    }
458
459    /// Optimized JSON encoding with pre-calculated capacity and zero-copy optimizations
460    #[inline]
461    fn fast_encode_json(&mut self, dst: &mut BytesMut) {
462        self.make_custom_header_to_net();
463
464        // Pre-calculate approximate header size to reduce reallocations
465        let estimated_header_size = self.estimate_json_header_size();
466        let body_length = self.body.as_ref().map_or(0, |b| b.len());
467
468        // Reserve space upfront: 4 (total_length) + 4 (serialize_type) + estimated_header + body
469        dst.reserve(8 + estimated_header_size + body_length);
470
471        // Encode header using simd-json for better performance when available
472        #[cfg(feature = "simd")]
473        let encode_result = simd_json::to_vec(self);
474
475        #[cfg(not(feature = "simd"))]
476        let encode_result = serde_json::to_vec(self);
477
478        match encode_result {
479            Ok(header_bytes) => {
480                let header_length = header_bytes.len() as i32;
481                let body_length = body_length as i32;
482                let total_length = 4 + header_length + body_length;
483
484                // Write frame header
485                dst.put_i32(total_length);
486                dst.put_i32(RemotingCommand::mark_serialize_type(header_length, SerializeType::JSON));
487
488                // Write header bytes (zero-copy from Vec)
489                dst.put_slice(&header_bytes);
490            }
491            Err(e) => {
492                error!("Failed to encode JSON header: {}", e);
493                // Write minimal error frame
494                dst.put_i32(4); // total_length: just the serialize_type field
495                dst.put_i32(RemotingCommand::mark_serialize_type(0, SerializeType::JSON));
496            }
497        }
498    }
499
500    /// Optimized ROCKETMQ binary encoding with minimal allocations
501    #[inline]
502    fn fast_encode_rocketmq(&mut self, dst: &mut BytesMut) {
503        let begin_index = dst.len();
504
505        // Reserve space for total_length (4 bytes) and serialize_type (4 bytes)
506        dst.reserve(8);
507        dst.put_i64(0); // Placeholder for total_length + serialize_type
508
509        // Check if custom header supports fast codec
510        if let Some(header) = self.command_custom_header_ref() {
511            if !header.support_fast_codec() {
512                self.make_custom_header_to_net();
513            }
514        }
515
516        // Encode header directly to buffer
517        let header_size = RocketMQSerializable::rocketmq_protocol_encode(self, dst);
518        let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
519
520        // Calculate serialize type with header length embedded
521        let serialize_type = RemotingCommand::mark_serialize_type(header_size as i32, SerializeType::ROCKETMQ);
522
523        // Write total_length and serialize_type at the beginning (in-place update)
524        let total_length = (header_size as i32 + body_length).to_be_bytes();
525        let serialize_type_bytes = serialize_type.to_be_bytes();
526
527        dst[begin_index..begin_index + 4].copy_from_slice(&total_length);
528        dst[begin_index + 4..begin_index + 8].copy_from_slice(&serialize_type_bytes);
529    }
530
531    /// Estimate JSON header size to reduce buffer reallocations
532    /// This is an approximation based on typical field sizes
533    #[inline]
534    fn estimate_json_header_size(&self) -> usize {
535        let mut size = 100; // Base JSON overhead
536
537        if let Some(ref remark) = self.remark {
538            size += remark.len() + 20; // "remark":"..." + quotes
539        }
540
541        if let Some(ref ext) = self.ext_fields {
542            // Approximate: each entry adds ~30 bytes overhead + key/value lengths
543            size += ext.iter().map(|(k, v)| k.len() + v.len() + 30).sum::<usize>();
544        }
545
546        size
547    }
548
549    /// Optimized decode with enhanced boundary checks and reduced allocations
550    #[inline]
551    pub fn decode(src: &mut BytesMut) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
552        const FRAME_HEADER_SIZE: usize = 4;
553        const SERIALIZE_TYPE_SIZE: usize = 4;
554        const MIN_PAYLOAD_SIZE: usize = SERIALIZE_TYPE_SIZE; // Minimum: just serialize_type field
555
556        let available = src.len();
557
558        // Early return if not enough data for frame header
559        if available < FRAME_HEADER_SIZE {
560            return Ok(None);
561        }
562
563        // Read total size without advancing the buffer (peek)
564        let total_size = i32::from_be_bytes([src[0], src[1], src[2], src[3]]) as usize;
565
566        // Validate total_size to prevent overflow attacks (check max first)
567        if total_size > 16 * 1024 * 1024 {
568            return Err(rocketmq_error::RocketMQError::Serialization(
569                rocketmq_error::SerializationError::DecodeFailed {
570                    format: "remoting_command",
571                    message: format!("Frame size {total_size} exceeds maximum allowed (16MB)"),
572                },
573            ));
574        }
575
576        // Wait for complete frame
577        let full_frame_size = total_size + FRAME_HEADER_SIZE;
578        if available < full_frame_size {
579            return Ok(None);
580        }
581
582        // Now validate minimum total_size (we have the complete frame)
583        if total_size < MIN_PAYLOAD_SIZE {
584            return Err(rocketmq_error::RocketMQError::Serialization(
585                rocketmq_error::SerializationError::DecodeFailed {
586                    format: "remoting_command",
587                    message: format!("Invalid total_size {total_size}, minimum required is {MIN_PAYLOAD_SIZE}"),
588                },
589            ));
590        }
591
592        // Extract complete frame (zero-copy split)
593        let mut cmd_data = src.split_to(full_frame_size);
594        cmd_data.advance(FRAME_HEADER_SIZE); // Skip total_size field
595
596        // Ensure we have serialize_type field (should always pass after above checks)
597        if cmd_data.remaining() < SERIALIZE_TYPE_SIZE {
598            return Err(rocketmq_error::RocketMQError::Serialization(
599                rocketmq_error::SerializationError::DecodeFailed {
600                    format: "remoting_command",
601                    message: "Incomplete serialize_type field".to_string(),
602                },
603            ));
604        }
605
606        // Parse header length and protocol type
607        let ori_header_length = cmd_data.get_i32();
608        let header_length = parse_header_length(ori_header_length);
609
610        // Validate header length
611        if header_length > total_size - SERIALIZE_TYPE_SIZE {
612            return Err(rocketmq_error::RocketMQError::Serialization(
613                rocketmq_error::SerializationError::DecodeFailed {
614                    format: "remoting_command",
615                    message: format!("Invalid header length {header_length}, total size {total_size}"),
616                },
617            ));
618        }
619
620        let protocol_type = parse_serialize_type(ori_header_length)?;
621
622        // Split header and body (zero-copy)
623        let mut header_data = cmd_data.split_to(header_length);
624
625        // Decode header
626        let mut cmd = RemotingCommand::header_decode(&mut header_data, header_length, protocol_type)?;
627
628        // Attach body if present (zero-copy freeze)
629        if let Some(ref mut cmd) = cmd {
630            let body_length = total_size - SERIALIZE_TYPE_SIZE - header_length;
631            if body_length > 0 {
632                if cmd_data.remaining() >= body_length {
633                    cmd.set_body_mut_ref(cmd_data.split_to(body_length).freeze());
634                } else {
635                    return Err(rocketmq_error::RocketMQError::Serialization(
636                        rocketmq_error::SerializationError::DecodeFailed {
637                            format: "remoting_command",
638                            message: format!(
639                                "Insufficient body data: expected {body_length}, available {}",
640                                cmd_data.remaining()
641                            ),
642                        },
643                    ));
644                }
645            }
646        }
647
648        Ok(cmd)
649    }
650
651    /// Optimized header decoding with type-based dispatch
652    #[inline]
653    pub fn header_decode(
654        src: &mut BytesMut,
655        header_length: usize,
656        type_: SerializeType,
657    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
658        match type_ {
659            SerializeType::JSON => {
660                // Deserialize JSON header using simd-json when available
661                #[cfg(feature = "simd")]
662                let cmd = {
663                    let mut slice = src.split_to(header_length).to_vec();
664                    simd_json::from_slice::<RemotingCommand>(&mut slice).map_err(|error| {
665                        rocketmq_error::RocketMQError::Serialization(rocketmq_error::SerializationError::DecodeFailed {
666                            format: "json",
667                            message: format!("SIMD JSON deserialization error: {error}"),
668                        })
669                    })?
670                };
671
672                #[cfg(not(feature = "simd"))]
673                let cmd = SerdeJsonUtils::from_json_slice::<RemotingCommand>(src).map_err(|error| {
674                    rocketmq_error::RocketMQError::Serialization(rocketmq_error::SerializationError::DecodeFailed {
675                        format: "json",
676                        message: format!("JSON deserialization error: {error}"),
677                    })
678                })?;
679
680                Ok(Some(cmd.set_serialize_type(SerializeType::JSON)))
681            }
682            SerializeType::ROCKETMQ => {
683                // Deserialize binary header
684                let cmd = RocketMQSerializable::rocket_mq_protocol_decode(src, header_length)?;
685                Ok(Some(cmd.set_serialize_type(SerializeType::ROCKETMQ)))
686            }
687        }
688    }
689
690    #[inline]
691    pub fn get_body(&self) -> Option<&Bytes> {
692        self.body.as_ref()
693    }
694
695    #[inline]
696    pub fn get_body_mut(&mut self) -> Option<&mut Bytes> {
697        self.body.as_mut()
698    }
699
700    #[inline]
701    pub fn mark_serialize_type(header_length: i32, protocol_type: SerializeType) -> i32 {
702        ((protocol_type.get_code() as i32) << 24) | (header_length & 0x00FFFFFF)
703    }
704
705    #[inline]
706    pub fn code(&self) -> i32 {
707        self.code
708    }
709
710    #[inline]
711    pub fn request_code(&self) -> RequestCode {
712        RequestCode::from(self.code)
713    }
714
715    #[inline]
716    pub fn code_ref(&self) -> &i32 {
717        &self.code
718    }
719
720    #[inline]
721    pub fn language(&self) -> LanguageCode {
722        self.language
723    }
724
725    #[inline]
726    pub fn version(&self) -> i32 {
727        self.version
728    }
729
730    pub fn rocketmq_version(&self) -> RocketMqVersion {
731        RocketMqVersion::from_ordinal(self.version as u32)
732    }
733
734    #[inline]
735    pub fn opaque(&self) -> i32 {
736        self.opaque
737    }
738
739    #[inline]
740    pub fn flag(&self) -> i32 {
741        self.flag
742    }
743
744    #[inline]
745    pub fn remark(&self) -> Option<&CheetahString> {
746        self.remark.as_ref()
747    }
748
749    #[inline]
750    pub fn ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
751        self.ext_fields.as_ref()
752    }
753
754    #[inline]
755    pub fn body(&self) -> Option<&Bytes> {
756        self.body.as_ref()
757    }
758
759    #[inline]
760    pub fn take_body(&mut self) -> Option<Bytes> {
761        self.body.take()
762    }
763
764    #[inline]
765    pub fn suspended(&self) -> bool {
766        self.suspended
767    }
768
769    #[inline]
770    pub fn serialize_type(&self) -> SerializeType {
771        self.serialize_type
772    }
773
774    pub fn decode_command_custom_header<T>(&self) -> rocketmq_error::RocketMQResult<T>
775    where
776        T: FromMap<Target = T, Error = rocketmq_error::RocketMQError>,
777    {
778        match self.ext_fields {
779            None => Err(rocketmq_error::RocketMQError::Serialization(
780                rocketmq_error::SerializationError::DecodeFailed {
781                    format: "header",
782                    message: "ExtFields is None".to_string(),
783                },
784            )),
785            Some(ref header) => T::from(header),
786        }
787    }
788
789    pub fn decode_command_custom_header_fast<T>(&self) -> rocketmq_error::RocketMQResult<T>
790    where
791        T: FromMap<Target = T, Error = rocketmq_error::RocketMQError>,
792        T: Default + CommandCustomHeader,
793    {
794        match self.ext_fields {
795            None => Err(rocketmq_error::RocketMQError::Serialization(
796                rocketmq_error::SerializationError::DecodeFailed {
797                    format: "header",
798                    message: "ExtFields is None".to_string(),
799                },
800            )),
801            Some(ref header) => {
802                let mut target = T::default();
803                if target.support_fast_codec() {
804                    target.decode_fast(header)?;
805                    Ok(target)
806                } else {
807                    T::from(header)
808                }
809            }
810        }
811    }
812
813    #[inline]
814    pub fn is_response_type(&self) -> bool {
815        let bits = 1 << Self::RPC_TYPE;
816        (self.flag & bits) == bits
817    }
818
819    #[inline]
820    pub fn is_oneway_rpc(&self) -> bool {
821        let bits = 1 << Self::RPC_ONEWAY;
822        (self.flag & bits) == bits
823    }
824
825    pub fn get_type(&self) -> RemotingCommandType {
826        if self.is_response_type() {
827            RemotingCommandType::RESPONSE
828        } else {
829            RemotingCommandType::REQUEST
830        }
831    }
832
833    #[inline]
834    pub fn with_opaque(&mut self, opaque: i32) -> &mut Self {
835        self.opaque = opaque;
836        self
837    }
838
839    pub fn add_ext_field(&mut self, key: impl Into<CheetahString>, value: impl Into<CheetahString>) -> &mut Self {
840        if let Some(ref mut ext) = self.ext_fields {
841            ext.insert(key.into(), value.into());
842        }
843        self
844    }
845
846    #[inline]
847    pub fn with_code(&mut self, code: impl Into<i32>) -> &mut Self {
848        self.code = code.into();
849        self
850    }
851
852    #[inline]
853    pub fn with_remark(&mut self, remark: impl Into<CheetahString>) -> &mut Self {
854        self.remark = Some(remark.into());
855        self
856    }
857
858    #[inline]
859    pub fn get_ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
860        self.ext_fields.as_ref()
861    }
862
863    pub fn read_custom_header_ref<T>(&self) -> Option<&T>
864    where
865        T: CommandCustomHeader + Sync + Send + 'static,
866    {
867        match self.command_custom_header.as_ref() {
868            None => None,
869            Some(value) => value.as_ref().as_any().downcast_ref::<T>(),
870        }
871    }
872
873    pub fn read_custom_header_ref_unchecked<T>(&self) -> &T
874    where
875        T: CommandCustomHeader + Sync + Send + 'static,
876    {
877        match self.command_custom_header.as_ref() {
878            None => unsafe { hint::unreachable_unchecked() },
879            Some(value) => value.as_ref().as_any().downcast_ref::<T>().unwrap(),
880        }
881    }
882
883    pub fn read_custom_header_mut<T>(&mut self) -> Option<&mut T>
884    where
885        T: CommandCustomHeader + Sync + Send + 'static,
886    {
887        match self.command_custom_header.as_mut() {
888            None => None,
889            Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>(),
890        }
891    }
892
893    pub fn read_custom_header_mut_from_ref<T>(&self) -> Option<&mut T>
894    where
895        T: CommandCustomHeader + Sync + Send + 'static,
896    {
897        match self.command_custom_header.as_ref() {
898            None => None,
899            Some(value) => value.mut_from_ref().as_any_mut().downcast_mut::<T>(),
900        }
901    }
902
903    pub fn read_custom_header_mut_unchecked<T>(&mut self) -> &mut T
904    where
905        T: CommandCustomHeader + Sync + Send + 'static,
906    {
907        match self.command_custom_header.as_mut() {
908            None => unsafe { hint::unreachable_unchecked() },
909            Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>().unwrap(),
910        }
911    }
912
913    pub fn command_custom_header_ref(&self) -> Option<&dyn CommandCustomHeader> {
914        match self.command_custom_header.as_ref() {
915            None => None,
916            Some(value) => Some(value.as_ref().as_ref()),
917        }
918    }
919
920    pub fn command_custom_header_mut(&mut self) -> Option<&mut dyn CommandCustomHeader> {
921        match self.command_custom_header.as_mut() {
922            None => None,
923            Some(value) => Some(value.as_mut().as_mut()),
924        }
925    }
926
927    pub fn create_new_request_id() -> i32 {
928        REQUEST_ID.fetch_add(1, Ordering::AcqRel)
929    }
930
931    #[inline]
932    pub fn add_ext_field_if_not_exist(&mut self, key: impl Into<CheetahString>, value: impl Into<CheetahString>) {
933        if let Some(ref mut ext) = self.ext_fields {
934            ext.entry(key.into()).or_insert(value.into());
935        }
936    }
937
938    /// Ensures the extension fields map is initialized.
939    ///
940    /// If `ext_fields` is `None`, initializes it with an empty `HashMap`.
941    /// This method is idempotent and safe to call multiple times.
942    #[inline]
943    pub fn ensure_ext_fields_initialized(&mut self) {
944        if self.ext_fields.is_none() {
945            self.ext_fields = Some(std::collections::HashMap::new());
946        }
947    }
948}
949
950/// Extract header length from the combined serialize_type field
951#[inline]
952pub fn parse_header_length(size: i32) -> usize {
953    (size & 0x00FFFFFF) as usize
954}
955
956/// Combine serialize type code with header length
957#[inline]
958pub fn mark_protocol_type(source: i32, serialize_type: SerializeType) -> i32 {
959    ((serialize_type.get_code() as i32) << 24) | (source & 0x00FFFFFF)
960}
961
962/// Extract serialize type from the combined field
963#[inline]
964pub fn parse_serialize_type(size: i32) -> rocketmq_error::RocketMQResult<SerializeType> {
965    let code = (size >> 24) as u8;
966    SerializeType::value_of(code).ok_or({
967        rocketmq_error::RocketMQError::Protocol(rocketmq_error::ProtocolError::UnsupportedSerializationType {
968            serialize_type: code,
969        })
970    })
971}
972
973impl AsRef<RemotingCommand> for RemotingCommand {
974    #[inline]
975    fn as_ref(&self) -> &RemotingCommand {
976        self
977    }
978}
979
980impl AsMut<RemotingCommand> for RemotingCommand {
981    #[inline]
982    fn as_mut(&mut self) -> &mut RemotingCommand {
983        self
984    }
985}
986
987#[cfg(test)]
988mod tests {
989    use super::*;
990
991    #[test]
992    fn test_remoting_command() {
993        let command = RemotingCommand::create_remoting_command(1)
994            .set_code(1)
995            .set_language(LanguageCode::JAVA)
996            .set_opaque(1)
997            .set_flag(1)
998            .set_ext_fields(HashMap::new())
999            .set_remark_option(Some("remark".to_string()));
1000
1001        assert_eq!(
1002            "{\"code\":1,\"language\":\"JAVA\",\"version\":0,\"opaque\":1,\"flag\":1,\"remark\":\"remark\",\"\
1003             extFields\":{},\"serializeTypeCurrentRPC\":\"JSON\"}",
1004            serde_json::to_string(&command).unwrap()
1005        );
1006    }
1007
1008    #[test]
1009    fn test_mark_serialize_type() {
1010        let i = RemotingCommand::mark_serialize_type(261, SerializeType::JSON);
1011        assert_eq!(i, 261);
1012
1013        let i = RemotingCommand::mark_serialize_type(16777215, SerializeType::JSON);
1014        assert_eq!(i, 16777215);
1015
1016        println!("i={}", RemotingCommand::default().opaque);
1017        println!("i={}", RemotingCommand::default().opaque);
1018        println!("i={}", RemotingCommand::default().opaque);
1019        println!("i={}", RemotingCommand::default().opaque);
1020    }
1021}