1use std::collections::HashMap;
19use std::fmt;
20use std::hint;
21use std::sync::atomic::AtomicI32;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::sync::Once;
25
26use bytes::Buf;
27use bytes::BufMut;
28use bytes::Bytes;
29use bytes::BytesMut;
30use cheetah_string::CheetahString;
31use lazy_static::lazy_static;
32use rocketmq_common::common::mq_version::RocketMqVersion;
33#[cfg(not(feature = "simd"))]
34use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
35use rocketmq_common::EnvUtils::EnvUtils;
36use rocketmq_rust::ArcMut;
37use serde::Deserialize;
38use serde::Serialize;
39use tracing::error;
40
41use super::RemotingCommandType;
42use super::SerializeType;
43use crate::code::request_code::RequestCode;
44use crate::code::response_code::RemotingSysResponseCode;
45use crate::protocol::command_custom_header::CommandCustomHeader;
46use crate::protocol::command_custom_header::FromMap;
47use crate::protocol::LanguageCode;
48use crate::rocketmq_serializable::RocketMQSerializable;
49
50pub const SERIALIZE_TYPE_PROPERTY: &str = "rocketmq.serialize.type";
51pub const SERIALIZE_TYPE_ENV: &str = "ROCKETMQ_SERIALIZE_TYPE";
52pub const REMOTING_VERSION_KEY: &str = "rocketmq.remoting.version";
53
54lazy_static! {
55 static ref requestId: Arc<AtomicI32> = Arc::new(AtomicI32::new(0));
56 static ref CONFIG_VERSION: i32 = {
57 EnvUtils::get_property(REMOTING_VERSION_KEY)
58 .unwrap_or(String::from("0"))
59 .parse::<i32>()
60 .unwrap_or(0)
61 };
62 static ref INIT: Once = Once::new();
63 pub static ref SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER: SerializeType = {
64 let protocol = std::env::var(SERIALIZE_TYPE_PROPERTY).unwrap_or_else(|_| {
65 std::env::var(SERIALIZE_TYPE_ENV).unwrap_or_else(|_| "".to_string())
66 });
67 match protocol.as_str() {
68 "JSON" => SerializeType::JSON,
69 "ROCKETMQ" => SerializeType::ROCKETMQ,
70 _ => SerializeType::JSON,
71 }
72 };
73}
74
75fn set_cmd_version(cmd: &mut RemotingCommand) {
76 cmd.set_version_ref(*CONFIG_VERSION);
77}
78
79#[derive(Serialize, Deserialize)]
80pub struct RemotingCommand {
81 code: i32,
82 language: LanguageCode,
83 version: i32,
84 opaque: i32,
85
86 flag: i32,
92 remark: Option<CheetahString>,
93
94 #[serde(rename = "extFields")]
95 ext_fields: Option<HashMap<CheetahString, CheetahString>>,
96
97 #[serde(skip)]
98 body: Option<Bytes>,
99 #[serde(skip)]
100 suspended: bool,
101 #[serde(skip)]
102 command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
103 #[serde(rename = "serializeTypeCurrentRPC")]
104 serialize_type: SerializeType,
105}
106
107impl Clone for RemotingCommand {
108 fn clone(&self) -> Self {
109 Self {
110 code: self.code,
111 language: self.language,
112 version: self.version,
113 opaque: self.opaque,
114 flag: self.flag,
115 remark: self.remark.clone(),
116 ext_fields: self.ext_fields.clone(),
117 body: self.body.clone(),
118 suspended: self.suspended,
119 command_custom_header: self.command_custom_header.clone(),
120 serialize_type: self.serialize_type,
121 }
122 }
123}
124
125impl fmt::Display for RemotingCommand {
126 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127 write!(
128 f,
129 "RemotingCommand [code={}, language={}, version={}, opaque={}, flag(B)={:b}, \
130 remark={}, extFields={:?}, serializeTypeCurrentRPC={}]",
131 self.code,
132 self.language,
133 self.version,
134 self.opaque,
135 self.flag,
136 self.remark.as_ref().unwrap_or(&CheetahString::default()),
137 self.ext_fields,
138 self.serialize_type
139 )
140 }
141}
142
143impl Default for RemotingCommand {
144 fn default() -> Self {
145 let opaque = requestId.fetch_add(1, Ordering::AcqRel);
146 RemotingCommand {
147 code: 0,
148 language: LanguageCode::RUST, version: 0,
150 opaque,
151 flag: 0,
152 remark: None,
153 ext_fields: None,
154 body: None,
155 suspended: false,
156 command_custom_header: None,
157 serialize_type: *SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER,
158 }
159 }
160}
161
162impl RemotingCommand {
163 pub(crate) const RPC_ONEWAY: i32 = 1;
164 pub(crate) const RPC_TYPE: i32 = 0;
165}
166
167impl RemotingCommand {
168 pub fn new_request(code: impl Into<i32>, body: impl Into<Bytes>) -> Self {
169 Self::default().set_code(code).set_body(body)
170 }
171
172 pub fn create_request_command<T>(code: impl Into<i32>, header: T) -> Self
173 where
174 T: CommandCustomHeader + Sync + Send + 'static,
175 {
176 let mut command = Self::default()
177 .set_code(code.into())
178 .set_command_custom_header(header);
179 set_cmd_version(&mut command);
180 command
181 }
182
183 pub fn create_remoting_command(code: impl Into<i32>) -> Self {
184 let command = Self::default();
185 command.set_code(code.into())
186 }
187
188 pub fn get_and_add() -> i32 {
189 requestId.fetch_add(1, Ordering::AcqRel)
190 }
191
192 pub fn create_response_command_with_code(code: impl Into<i32>) -> Self {
193 Self::default().set_code(code).mark_response_type()
194 }
195
196 pub fn create_response_command_with_code_remark(
197 code: impl Into<i32>,
198 remark: impl Into<CheetahString>,
199 ) -> Self {
200 Self::default()
201 .set_code(code)
202 .set_remark_option(Some(remark.into()))
203 .mark_response_type()
204 }
205
206 pub fn create_response_command() -> Self {
207 Self::default()
208 .set_code(RemotingSysResponseCode::Success)
209 .mark_response_type()
210 }
211
212 pub fn create_response_command_with_header(
213 header: impl CommandCustomHeader + Sync + Send + 'static,
214 ) -> Self {
215 Self::default()
216 .set_code(RemotingSysResponseCode::Success)
217 .set_command_custom_header(header)
218 .mark_response_type()
219 }
220
221 pub fn set_command_custom_header<T>(mut self, command_custom_header: T) -> Self
222 where
223 T: CommandCustomHeader + Sync + Send + 'static,
224 {
225 self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
226 self
227 }
228
229 pub fn set_command_custom_header_origin(
230 mut self,
231 command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
232 ) -> Self {
233 self.command_custom_header = command_custom_header;
234 self
235 }
236
237 pub fn set_command_custom_header_ref<T>(&mut self, command_custom_header: T)
238 where
239 T: CommandCustomHeader + Sync + Send + 'static,
240 {
241 self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
242 }
243
244 pub fn set_code(mut self, code: impl Into<i32>) -> Self {
245 self.code = code.into();
246 self
247 }
248
249 pub fn set_code_ref(&mut self, code: impl Into<i32>) {
250 self.code = code.into();
251 }
252
253 pub fn set_code_mut(&mut self, code: impl Into<i32>) -> &mut Self {
254 self.code = code.into();
255 self
256 }
257
258 pub fn set_language(mut self, language: LanguageCode) -> Self {
259 self.language = language;
260 self
261 }
262
263 pub fn set_version_ref(&mut self, version: i32) {
264 self.version = version;
265 }
266
267 pub fn set_version(mut self, version: i32) -> Self {
268 self.version = version;
269 self
270 }
271
272 #[inline]
273 pub fn set_opaque(mut self, opaque: i32) -> Self {
274 self.opaque = opaque;
275 self
276 }
277
278 #[inline]
279 pub fn set_opaque_mut(&mut self, opaque: i32) {
280 self.opaque = opaque;
281 }
282
283 #[inline]
284 pub fn set_flag(mut self, flag: i32) -> Self {
285 self.flag = flag;
286 self
287 }
288
289 #[inline]
290 pub fn set_remark_option(mut self, remark: Option<impl Into<CheetahString>>) -> Self {
291 self.remark = remark.map(|item| item.into());
292 self
293 }
294
295 #[inline]
296 pub fn set_remark(mut self, remark: impl Into<CheetahString>) -> Self {
297 self.remark = Some(remark.into());
298 self
299 }
300
301 #[inline]
302 pub fn set_remark_option_mut(&mut self, remark: Option<impl Into<CheetahString>>) {
303 self.remark = remark.map(|item| item.into());
304 }
305
306 #[inline]
307 pub fn set_remark_mut(&mut self, remark: impl Into<CheetahString>) {
308 self.remark = Some(remark.into());
309 }
310
311 #[inline]
312 pub fn set_ext_fields(mut self, ext_fields: HashMap<CheetahString, CheetahString>) -> Self {
313 self.ext_fields = Some(ext_fields);
314 self
315 }
316
317 #[inline]
318 pub fn set_body(mut self, body: impl Into<Bytes>) -> Self {
319 self.body = Some(body.into());
320 self
321 }
322
323 #[inline]
324 pub fn set_body_mut_ref(&mut self, body: impl Into<Bytes>) {
325 self.body = Some(body.into());
326 }
327
328 #[inline]
329 pub fn set_suspended(mut self, suspended: bool) -> Self {
330 self.suspended = suspended;
331 self
332 }
333
334 #[inline]
335 pub fn set_suspended_ref(&mut self, suspended: bool) {
336 self.suspended = suspended;
337 }
338
339 #[inline]
340 pub fn set_serialize_type(mut self, serialize_type: SerializeType) -> Self {
341 self.serialize_type = serialize_type;
342 self
343 }
344
345 #[inline]
346 pub fn mark_response_type(mut self) -> Self {
347 let mark = 1 << Self::RPC_TYPE;
348 self.flag |= mark;
349 self
350 }
351
352 #[inline]
353 pub fn mark_response_type_ref(&mut self) {
354 let mark = 1 << Self::RPC_TYPE;
355 self.flag |= mark;
356 }
357
358 #[inline]
359 pub fn mark_oneway_rpc(mut self) -> Self {
360 let mark = 1 << Self::RPC_ONEWAY;
361 self.flag |= mark;
362 self
363 }
364
365 #[inline]
366 pub fn mark_oneway_rpc_ref(&mut self) {
367 let mark = 1 << Self::RPC_ONEWAY;
368 self.flag |= mark;
369 }
370
371 #[inline]
372 pub fn get_serialize_type(&self) -> SerializeType {
373 self.serialize_type
374 }
375
376 #[inline]
378 pub fn header_encode(&mut self) -> Option<Bytes> {
379 self.make_custom_header_to_net();
380 match self.serialize_type {
381 SerializeType::ROCKETMQ => {
382 Some(RocketMQSerializable::rocket_mq_protocol_encode_bytes(self))
383 }
384 SerializeType::JSON => {
385 #[cfg(feature = "simd")]
386 {
387 match simd_json::to_vec(self) {
388 Ok(value) => Some(Bytes::from(value)),
389 Err(e) => {
390 error!("Failed to encode JSON header with simd-json: {}", e);
391 None
392 }
393 }
394 }
395 #[cfg(not(feature = "simd"))]
396 {
397 match serde_json::to_vec(self) {
398 Ok(value) => Some(Bytes::from(value)),
399 Err(e) => {
400 error!("Failed to encode JSON header: {}", e);
401 None
402 }
403 }
404 }
405 }
406 }
407 }
408
409 #[inline]
411 pub fn encode_header(&mut self) -> Option<Bytes> {
412 let body_length = self.body.as_ref().map_or(0, |b| b.len());
413 self.encode_header_with_body_length(body_length)
414 }
415
416 #[inline]
418 pub fn encode_header_with_body_length(&mut self, body_length: usize) -> Option<Bytes> {
419 let header_data = self.header_encode()?;
421 let header_len = header_data.len();
422
423 let frame_header_size = 8;
425 let total_length = 4 + header_len + body_length; let mut result = BytesMut::with_capacity(frame_header_size + header_len);
429
430 result.put_i32(total_length as i32);
432
433 result.put_i32(mark_protocol_type(header_len as i32, self.serialize_type));
435
436 result.put(header_data);
438
439 Some(result.freeze())
440 }
441
442 #[inline]
444 pub fn make_custom_header_to_net(&mut self) {
445 if let Some(header) = &self.command_custom_header {
446 if let Some(header_map) = header.to_map() {
447 match &mut self.ext_fields {
448 None => {
449 self.ext_fields = Some(header_map);
450 }
451 Some(ext) => {
452 for (key, value) in header_map {
454 ext.insert(key, value);
455 }
456 }
457 }
458 }
459 }
460 }
461
462 #[inline]
463 pub fn fast_header_encode(&mut self, dst: &mut BytesMut) {
464 match self.serialize_type {
465 SerializeType::JSON => {
466 self.fast_encode_json(dst);
467 }
468 SerializeType::ROCKETMQ => {
469 self.fast_encode_rocketmq(dst);
470 }
471 }
472 }
473
474 #[inline]
476 fn fast_encode_json(&mut self, dst: &mut BytesMut) {
477 self.make_custom_header_to_net();
478
479 let estimated_header_size = self.estimate_json_header_size();
481 let body_length = self.body.as_ref().map_or(0, |b| b.len());
482
483 dst.reserve(8 + estimated_header_size + body_length);
485
486 #[cfg(feature = "simd")]
488 let encode_result = simd_json::to_vec(self);
489
490 #[cfg(not(feature = "simd"))]
491 let encode_result = serde_json::to_vec(self);
492
493 match encode_result {
494 Ok(header_bytes) => {
495 let header_length = header_bytes.len() as i32;
496 let body_length = body_length as i32;
497 let total_length = 4 + header_length + body_length;
498
499 dst.put_i32(total_length);
501 dst.put_i32(RemotingCommand::mark_serialize_type(
502 header_length,
503 SerializeType::JSON,
504 ));
505
506 dst.put_slice(&header_bytes);
508 }
509 Err(e) => {
510 error!("Failed to encode JSON header: {}", e);
511 dst.put_i32(4); dst.put_i32(RemotingCommand::mark_serialize_type(0, SerializeType::JSON));
514 }
515 }
516 }
517
518 #[inline]
520 fn fast_encode_rocketmq(&mut self, dst: &mut BytesMut) {
521 let begin_index = dst.len();
522
523 dst.reserve(8);
525 dst.put_i64(0); if let Some(header) = self.command_custom_header_ref() {
529 if !header.support_fast_codec() {
530 self.make_custom_header_to_net();
531 }
532 }
533
534 let header_size = RocketMQSerializable::rocketmq_protocol_encode(self, dst);
536 let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
537
538 let serialize_type =
540 RemotingCommand::mark_serialize_type(header_size as i32, SerializeType::ROCKETMQ);
541
542 let total_length = (header_size as i32 + body_length).to_be_bytes();
544 let serialize_type_bytes = serialize_type.to_be_bytes();
545
546 dst[begin_index..begin_index + 4].copy_from_slice(&total_length);
547 dst[begin_index + 4..begin_index + 8].copy_from_slice(&serialize_type_bytes);
548 }
549
550 #[inline]
553 fn estimate_json_header_size(&self) -> usize {
554 let mut size = 100; if let Some(ref remark) = self.remark {
557 size += remark.len() + 20; }
559
560 if let Some(ref ext) = self.ext_fields {
561 size += ext
563 .iter()
564 .map(|(k, v)| k.len() + v.len() + 30)
565 .sum::<usize>();
566 }
567
568 size
569 }
570
571 #[inline]
573 pub fn decode(src: &mut BytesMut) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
574 const FRAME_HEADER_SIZE: usize = 4;
575 const SERIALIZE_TYPE_SIZE: usize = 4;
576 const MIN_PAYLOAD_SIZE: usize = SERIALIZE_TYPE_SIZE; let available = src.len();
579
580 if available < FRAME_HEADER_SIZE {
582 return Ok(None);
583 }
584
585 let total_size = i32::from_be_bytes([src[0], src[1], src[2], src[3]]) as usize;
587
588 if total_size > 16 * 1024 * 1024 {
590 return Err(rocketmq_error::RocketMQError::Serialization(
591 rocketmq_error::SerializationError::DecodeFailed {
592 format: "remoting_command",
593 message: format!("Frame size {total_size} exceeds maximum allowed (16MB)"),
594 },
595 ));
596 }
597
598 let full_frame_size = total_size + FRAME_HEADER_SIZE;
600 if available < full_frame_size {
601 return Ok(None);
602 }
603
604 if total_size < MIN_PAYLOAD_SIZE {
606 return Err(rocketmq_error::RocketMQError::Serialization(
607 rocketmq_error::SerializationError::DecodeFailed {
608 format: "remoting_command",
609 message: format!(
610 "Invalid total_size {total_size}, minimum required is {MIN_PAYLOAD_SIZE}"
611 ),
612 },
613 ));
614 }
615
616 let mut cmd_data = src.split_to(full_frame_size);
618 cmd_data.advance(FRAME_HEADER_SIZE); if cmd_data.remaining() < SERIALIZE_TYPE_SIZE {
622 return Err(rocketmq_error::RocketMQError::Serialization(
623 rocketmq_error::SerializationError::DecodeFailed {
624 format: "remoting_command",
625 message: "Incomplete serialize_type field".to_string(),
626 },
627 ));
628 }
629
630 let ori_header_length = cmd_data.get_i32();
632 let header_length = parse_header_length(ori_header_length);
633
634 if header_length > total_size - SERIALIZE_TYPE_SIZE {
636 return Err(rocketmq_error::RocketMQError::Serialization(
637 rocketmq_error::SerializationError::DecodeFailed {
638 format: "remoting_command",
639 message: format!(
640 "Invalid header length {header_length}, total size {total_size}"
641 ),
642 },
643 ));
644 }
645
646 let protocol_type = parse_serialize_type(ori_header_length)?;
647
648 let mut header_data = cmd_data.split_to(header_length);
650
651 let mut cmd =
653 RemotingCommand::header_decode(&mut header_data, header_length, protocol_type)?;
654
655 if let Some(ref mut cmd) = cmd {
657 let body_length = total_size - SERIALIZE_TYPE_SIZE - header_length;
658 if body_length > 0 {
659 if cmd_data.remaining() >= body_length {
660 cmd.set_body_mut_ref(cmd_data.split_to(body_length).freeze());
661 } else {
662 return Err(rocketmq_error::RocketMQError::Serialization(
663 rocketmq_error::SerializationError::DecodeFailed {
664 format: "remoting_command",
665 message: format!(
666 "Insufficient body data: expected {body_length}, available {}",
667 cmd_data.remaining()
668 ),
669 },
670 ));
671 }
672 }
673 }
674
675 Ok(cmd)
676 }
677
678 #[inline]
680 pub fn header_decode(
681 src: &mut BytesMut,
682 header_length: usize,
683 type_: SerializeType,
684 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
685 match type_ {
686 SerializeType::JSON => {
687 #[cfg(feature = "simd")]
689 let cmd = {
690 let mut slice = src.split_to(header_length).to_vec();
691 simd_json::from_slice::<RemotingCommand>(&mut slice).map_err(|error| {
692 rocketmq_error::RocketMQError::Serialization(
693 rocketmq_error::SerializationError::DecodeFailed {
694 format: "json",
695 message: format!("SIMD JSON deserialization error: {error}"),
696 },
697 )
698 })?
699 };
700
701 #[cfg(not(feature = "simd"))]
702 let cmd =
703 SerdeJsonUtils::from_json_slice::<RemotingCommand>(src).map_err(|error| {
704 rocketmq_error::RocketMQError::Serialization(
705 rocketmq_error::SerializationError::DecodeFailed {
706 format: "json",
707 message: format!("JSON deserialization error: {error}"),
708 },
709 )
710 })?;
711
712 Ok(Some(cmd.set_serialize_type(SerializeType::JSON)))
713 }
714 SerializeType::ROCKETMQ => {
715 let cmd = RocketMQSerializable::rocket_mq_protocol_decode(src, header_length)?;
717 Ok(Some(cmd.set_serialize_type(SerializeType::ROCKETMQ)))
718 }
719 }
720 }
721
722 #[inline]
723 pub fn get_body(&self) -> Option<&Bytes> {
724 self.body.as_ref()
725 }
726
727 #[inline]
728 pub fn get_body_mut(&mut self) -> Option<&mut Bytes> {
729 self.body.as_mut()
730 }
731
732 #[inline]
733 pub fn mark_serialize_type(header_length: i32, protocol_type: SerializeType) -> i32 {
734 ((protocol_type.get_code() as i32) << 24) | (header_length & 0x00FFFFFF)
735 }
736
737 #[inline]
738 pub fn code(&self) -> i32 {
739 self.code
740 }
741
742 #[inline]
743 pub fn request_code(&self) -> RequestCode {
744 RequestCode::from(self.code)
745 }
746
747 #[inline]
748 pub fn code_ref(&self) -> &i32 {
749 &self.code
750 }
751
752 #[inline]
753 pub fn language(&self) -> LanguageCode {
754 self.language
755 }
756
757 #[inline]
758 pub fn version(&self) -> i32 {
759 self.version
760 }
761
762 pub fn rocketmq_version(&self) -> RocketMqVersion {
763 RocketMqVersion::from_ordinal(self.version as u32)
764 }
765
766 #[inline]
767 pub fn opaque(&self) -> i32 {
768 self.opaque
769 }
770
771 #[inline]
772 pub fn flag(&self) -> i32 {
773 self.flag
774 }
775
776 #[inline]
777 pub fn remark(&self) -> Option<&CheetahString> {
778 self.remark.as_ref()
779 }
780
781 #[inline]
782 pub fn ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
783 self.ext_fields.as_ref()
784 }
785
786 #[inline]
787 pub fn body(&self) -> Option<&Bytes> {
788 self.body.as_ref()
789 }
790
791 #[inline]
792 pub fn take_body(&mut self) -> Option<Bytes> {
793 self.body.take()
794 }
795
796 #[inline]
797 pub fn suspended(&self) -> bool {
798 self.suspended
799 }
800
801 #[inline]
802 pub fn serialize_type(&self) -> SerializeType {
803 self.serialize_type
804 }
805
806 pub fn decode_command_custom_header<T>(&self) -> rocketmq_error::RocketMQResult<T>
807 where
808 T: FromMap<Target = T, Error = rocketmq_error::RocketMQError>,
809 {
810 match self.ext_fields {
811 None => Err(rocketmq_error::RocketMQError::Serialization(
812 rocketmq_error::SerializationError::DecodeFailed {
813 format: "header",
814 message: "ExtFields is None".to_string(),
815 },
816 )),
817 Some(ref header) => T::from(header),
818 }
819 }
820
821 pub fn decode_command_custom_header_fast<T>(&self) -> rocketmq_error::RocketMQResult<T>
822 where
823 T: FromMap<Target = T, Error = rocketmq_error::RocketMQError>,
824 T: Default + CommandCustomHeader,
825 {
826 match self.ext_fields {
827 None => Err(rocketmq_error::RocketMQError::Serialization(
828 rocketmq_error::SerializationError::DecodeFailed {
829 format: "header",
830 message: "ExtFields is None".to_string(),
831 },
832 )),
833 Some(ref header) => {
834 let mut target = T::default();
835 if target.support_fast_codec() {
836 target.decode_fast(header)?;
837 Ok(target)
838 } else {
839 T::from(header)
840 }
841 }
842 }
843 }
844
845 #[inline]
846 pub fn is_response_type(&self) -> bool {
847 let bits = 1 << Self::RPC_TYPE;
848 (self.flag & bits) == bits
849 }
850
851 #[inline]
852 pub fn is_oneway_rpc(&self) -> bool {
853 let bits = 1 << Self::RPC_ONEWAY;
854 (self.flag & bits) == bits
855 }
856
857 pub fn get_type(&self) -> RemotingCommandType {
858 if self.is_response_type() {
859 RemotingCommandType::RESPONSE
860 } else {
861 RemotingCommandType::REQUEST
862 }
863 }
864
865 #[inline]
866 pub fn with_opaque(&mut self, opaque: i32) -> &mut Self {
867 self.opaque = opaque;
868 self
869 }
870
871 pub fn add_ext_field(
872 &mut self,
873 key: impl Into<CheetahString>,
874 value: impl Into<CheetahString>,
875 ) -> &mut Self {
876 if let Some(ref mut ext) = self.ext_fields {
877 ext.insert(key.into(), value.into());
878 }
879 self
880 }
881
882 #[inline]
883 pub fn with_code(&mut self, code: impl Into<i32>) -> &mut Self {
884 self.code = code.into();
885 self
886 }
887
888 #[inline]
889 pub fn with_remark(&mut self, remark: impl Into<CheetahString>) -> &mut Self {
890 self.remark = Some(remark.into());
891 self
892 }
893
894 #[inline]
895 pub fn get_ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
896 self.ext_fields.as_ref()
897 }
898
899 pub fn read_custom_header_ref<T>(&self) -> Option<&T>
900 where
901 T: CommandCustomHeader + Sync + Send + 'static,
902 {
903 match self.command_custom_header.as_ref() {
904 None => None,
905 Some(value) => value.as_ref().as_any().downcast_ref::<T>(),
906 }
907 }
908
909 pub fn read_custom_header_ref_unchecked<T>(&self) -> &T
910 where
911 T: CommandCustomHeader + Sync + Send + 'static,
912 {
913 match self.command_custom_header.as_ref() {
914 None => unsafe { hint::unreachable_unchecked() },
915 Some(value) => value.as_ref().as_any().downcast_ref::<T>().unwrap(),
916 }
917 }
918
919 pub fn read_custom_header_mut<T>(&mut self) -> Option<&mut T>
920 where
921 T: CommandCustomHeader + Sync + Send + 'static,
922 {
923 match self.command_custom_header.as_mut() {
924 None => None,
925 Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>(),
926 }
927 }
928
929 pub fn read_custom_header_mut_from_ref<T>(&self) -> Option<&mut T>
930 where
931 T: CommandCustomHeader + Sync + Send + 'static,
932 {
933 match self.command_custom_header.as_ref() {
934 None => None,
935 Some(value) => value.mut_from_ref().as_any_mut().downcast_mut::<T>(),
936 }
937 }
938
939 pub fn read_custom_header_mut_unchecked<T>(&mut self) -> &mut T
940 where
941 T: CommandCustomHeader + Sync + Send + 'static,
942 {
943 match self.command_custom_header.as_mut() {
944 None => unsafe { hint::unreachable_unchecked() },
945 Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>().unwrap(),
946 }
947 }
948
949 pub fn command_custom_header_ref(&self) -> Option<&dyn CommandCustomHeader> {
950 match self.command_custom_header.as_ref() {
951 None => None,
952 Some(value) => Some(value.as_ref().as_ref()),
953 }
954 }
955
956 pub fn command_custom_header_mut(&mut self) -> Option<&mut dyn CommandCustomHeader> {
957 match self.command_custom_header.as_mut() {
958 None => None,
959 Some(value) => Some(value.as_mut().as_mut()),
960 }
961 }
962
963 pub fn create_new_request_id() -> i32 {
964 requestId.fetch_add(1, Ordering::AcqRel)
965 }
966
967 #[inline]
968 pub fn add_ext_field_if_not_exist(
969 &mut self,
970 key: impl Into<CheetahString>,
971 value: impl Into<CheetahString>,
972 ) {
973 if let Some(ref mut ext) = self.ext_fields {
974 ext.entry(key.into()).or_insert(value.into());
975 }
976 }
977}
978
979#[inline]
981pub fn parse_header_length(size: i32) -> usize {
982 (size & 0x00FFFFFF) as usize
983}
984
985#[inline]
987pub fn mark_protocol_type(source: i32, serialize_type: SerializeType) -> i32 {
988 ((serialize_type.get_code() as i32) << 24) | (source & 0x00FFFFFF)
989}
990
991#[inline]
993pub fn parse_serialize_type(size: i32) -> rocketmq_error::RocketMQResult<SerializeType> {
994 let code = (size >> 24) as u8;
995 SerializeType::value_of(code).ok_or({
996 rocketmq_error::RocketMQError::Protocol(
997 rocketmq_error::ProtocolError::UnsupportedSerializationType {
998 serialize_type: code,
999 },
1000 )
1001 })
1002}
1003
1004impl AsRef<RemotingCommand> for RemotingCommand {
1005 #[inline]
1006 fn as_ref(&self) -> &RemotingCommand {
1007 self
1008 }
1009}
1010
1011impl AsMut<RemotingCommand> for RemotingCommand {
1012 #[inline]
1013 fn as_mut(&mut self) -> &mut RemotingCommand {
1014 self
1015 }
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020 use super::*;
1021
1022 #[test]
1023 fn test_remoting_command() {
1024 let command = RemotingCommand::create_remoting_command(1)
1025 .set_code(1)
1026 .set_language(LanguageCode::JAVA)
1027 .set_opaque(1)
1028 .set_flag(1)
1029 .set_ext_fields(HashMap::new())
1030 .set_remark_option(Some("remark".to_string()));
1031
1032 assert_eq!(
1033 "{\"code\":1,\"language\":\"JAVA\",\"version\":0,\"opaque\":1,\"flag\":1,\"remark\":\"\
1034 remark\",\"extFields\":{},\"serializeTypeCurrentRPC\":\"JSON\"}",
1035 serde_json::to_string(&command).unwrap()
1036 );
1037 }
1038
1039 #[test]
1040 fn test_mark_serialize_type() {
1041 let i = RemotingCommand::mark_serialize_type(261, SerializeType::JSON);
1042 assert_eq!(i, 261);
1043
1044 let i = RemotingCommand::mark_serialize_type(16777215, SerializeType::JSON);
1045 assert_eq!(i, 16777215);
1046
1047 println!("i={}", RemotingCommand::default().opaque);
1048 println!("i={}", RemotingCommand::default().opaque);
1049 println!("i={}", RemotingCommand::default().opaque);
1050 println!("i={}", RemotingCommand::default().opaque);
1051 }
1052}