1use std::collections::HashMap;
16use std::fmt;
17use std::hint;
18use std::sync::atomic::AtomicI32;
19use std::sync::atomic::Ordering;
20use std::sync::Arc;
21
22use bytes::Buf;
23use bytes::BufMut;
24use bytes::Bytes;
25use bytes::BytesMut;
26use cheetah_string::CheetahString;
27use rocketmq_common::common::mq_version::RocketMqVersion;
28use rocketmq_common::common::mq_version::CURRENT_VERSION;
29#[cfg(not(feature = "simd"))]
30use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
31use rocketmq_common::EnvUtils::EnvUtils;
32use rocketmq_rust::ArcMut;
33use serde::Deserialize;
34use serde::Serialize;
35use tracing::error;
36
37use super::RemotingCommandType;
38use super::SerializeType;
39use crate::code::request_code::RequestCode;
40use crate::code::response_code::RemotingSysResponseCode;
41use crate::protocol::command_custom_header::CommandCustomHeader;
42use crate::protocol::command_custom_header::FromMap;
43use crate::protocol::LanguageCode;
44use crate::rocketmq_serializable::RocketMQSerializable;
45
46pub const SERIALIZE_TYPE_PROPERTY: &str = "rocketmq.serialize.type";
47pub const SERIALIZE_TYPE_ENV: &str = "ROCKETMQ_SERIALIZE_TYPE";
48pub const REMOTING_VERSION_KEY: &str = "rocketmq.remoting.version";
49
50static REQUEST_ID: std::sync::LazyLock<Arc<AtomicI32>> = std::sync::LazyLock::new(|| Arc::new(AtomicI32::new(0)));
51
52static CONFIG_VERSION: std::sync::LazyLock<i32> = std::sync::LazyLock::new(|| {
53 EnvUtils::get_property(REMOTING_VERSION_KEY)
54 .unwrap_or_else(|| (CURRENT_VERSION as i32).to_string())
55 .parse::<i32>()
56 .unwrap_or(CURRENT_VERSION as i32)
57});
58
59pub static SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER: std::sync::LazyLock<SerializeType> = std::sync::LazyLock::new(|| {
60 let protocol = std::env::var(SERIALIZE_TYPE_PROPERTY)
61 .unwrap_or_else(|_| std::env::var(SERIALIZE_TYPE_ENV).unwrap_or_else(|_| "".to_string()));
62 match protocol.as_str() {
63 "JSON" => SerializeType::JSON,
64 "ROCKETMQ" => SerializeType::ROCKETMQ,
65 _ => SerializeType::JSON,
66 }
67});
68
69fn set_cmd_version(cmd: &mut RemotingCommand) {
70 cmd.set_version_ref(*CONFIG_VERSION);
71}
72
73#[derive(Serialize, Deserialize)]
74pub struct RemotingCommand {
75 code: i32,
76 language: LanguageCode,
77 version: i32,
78 opaque: i32,
79
80 flag: i32,
86 remark: Option<CheetahString>,
87
88 #[serde(rename = "extFields")]
89 ext_fields: Option<HashMap<CheetahString, CheetahString>>,
90
91 #[serde(skip)]
92 body: Option<Bytes>,
93 #[serde(skip)]
94 suspended: bool,
95 #[serde(skip)]
96 command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
97 #[serde(rename = "serializeTypeCurrentRPC")]
98 serialize_type: SerializeType,
99}
100
101impl Clone for RemotingCommand {
102 fn clone(&self) -> Self {
103 Self {
104 code: self.code,
105 language: self.language,
106 version: self.version,
107 opaque: self.opaque,
108 flag: self.flag,
109 remark: self.remark.clone(),
110 ext_fields: self.ext_fields.clone(),
111 body: self.body.clone(),
112 suspended: self.suspended,
113 command_custom_header: self.command_custom_header.clone(),
114 serialize_type: self.serialize_type,
115 }
116 }
117}
118
119impl fmt::Display for RemotingCommand {
120 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121 write!(
122 f,
123 "RemotingCommand [code={}, language={}, version={}, opaque={}, flag(B)={:b}, remark={}, extFields={:?}, \
124 serializeTypeCurrentRPC={}]",
125 self.code,
126 self.language,
127 self.version,
128 self.opaque,
129 self.flag,
130 self.remark.as_ref().unwrap_or(&CheetahString::default()),
131 self.ext_fields,
132 self.serialize_type
133 )
134 }
135}
136
137impl Default for RemotingCommand {
138 fn default() -> Self {
139 let opaque = REQUEST_ID.fetch_add(1, Ordering::AcqRel);
140 RemotingCommand {
141 code: 0,
142 language: LanguageCode::RUST, version: 0,
144 opaque,
145 flag: 0,
146 remark: None,
147 ext_fields: None,
148 body: None,
149 suspended: false,
150 command_custom_header: None,
151 serialize_type: *SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER,
152 }
153 }
154}
155
156impl RemotingCommand {
157 pub(crate) const RPC_ONEWAY: i32 = 1;
158 pub(crate) const RPC_TYPE: i32 = 0;
159}
160
161impl RemotingCommand {
162 pub fn new_request(code: impl Into<i32>, body: impl Into<Bytes>) -> Self {
163 Self::default().set_code(code).set_body(body)
164 }
165
166 pub fn create_request_command<T>(code: impl Into<i32>, header: T) -> Self
167 where
168 T: CommandCustomHeader + Sync + Send + 'static,
169 {
170 let mut command = Self::default().set_code(code.into()).set_command_custom_header(header);
171 set_cmd_version(&mut command);
172 command
173 }
174
175 pub fn create_remoting_command(code: impl Into<i32>) -> Self {
176 let command = Self::default();
177 command.set_code(code.into())
178 }
179
180 pub fn get_and_add() -> i32 {
181 REQUEST_ID.fetch_add(1, Ordering::AcqRel)
182 }
183
184 pub fn create_response_command_with_code(code: impl Into<i32>) -> Self {
185 Self::default().set_code(code).mark_response_type()
186 }
187
188 pub fn create_response_command_with_code_remark(code: impl Into<i32>, remark: impl Into<CheetahString>) -> Self {
189 Self::default()
190 .set_code(code)
191 .set_remark_option(Some(remark.into()))
192 .mark_response_type()
193 }
194
195 pub fn create_response_command() -> Self {
196 Self::default()
197 .set_code(RemotingSysResponseCode::Success)
198 .mark_response_type()
199 }
200
201 pub fn create_response_command_with_header(header: impl CommandCustomHeader + Sync + Send + 'static) -> Self {
202 Self::default()
203 .set_code(RemotingSysResponseCode::Success)
204 .set_command_custom_header(header)
205 .mark_response_type()
206 }
207
208 pub fn set_command_custom_header<T>(mut self, command_custom_header: T) -> Self
209 where
210 T: CommandCustomHeader + Sync + Send + 'static,
211 {
212 self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
213 self
214 }
215
216 pub fn set_command_custom_header_origin(
217 mut self,
218 command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
219 ) -> Self {
220 self.command_custom_header = command_custom_header;
221 self
222 }
223
224 pub fn set_command_custom_header_ref<T>(&mut self, command_custom_header: T)
225 where
226 T: CommandCustomHeader + Sync + Send + 'static,
227 {
228 self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
229 }
230
231 pub fn set_code(mut self, code: impl Into<i32>) -> Self {
232 self.code = code.into();
233 self
234 }
235
236 pub fn set_code_ref(&mut self, code: impl Into<i32>) {
237 self.code = code.into();
238 }
239
240 pub fn set_code_mut(&mut self, code: impl Into<i32>) -> &mut Self {
241 self.code = code.into();
242 self
243 }
244
245 pub fn set_language(mut self, language: LanguageCode) -> Self {
246 self.language = language;
247 self
248 }
249
250 pub fn set_version_ref(&mut self, version: i32) {
251 self.version = version;
252 }
253
254 pub fn set_version(mut self, version: i32) -> Self {
255 self.version = version;
256 self
257 }
258
259 #[inline]
260 pub fn set_opaque(mut self, opaque: i32) -> Self {
261 self.opaque = opaque;
262 self
263 }
264
265 #[inline]
266 pub fn set_opaque_mut(&mut self, opaque: i32) {
267 self.opaque = opaque;
268 }
269
270 #[inline]
271 pub fn set_flag(mut self, flag: i32) -> Self {
272 self.flag = flag;
273 self
274 }
275
276 #[inline]
277 pub fn set_remark_option(mut self, remark: Option<impl Into<CheetahString>>) -> Self {
278 self.remark = remark.map(|item| item.into());
279 self
280 }
281
282 #[inline]
283 pub fn set_remark(mut self, remark: impl Into<CheetahString>) -> Self {
284 self.remark = Some(remark.into());
285 self
286 }
287
288 #[inline]
289 pub fn set_remark_option_mut(&mut self, remark: Option<impl Into<CheetahString>>) {
290 self.remark = remark.map(|item| item.into());
291 }
292
293 #[inline]
294 pub fn set_remark_mut(&mut self, remark: impl Into<CheetahString>) {
295 self.remark = Some(remark.into());
296 }
297
298 #[inline]
299 pub fn set_ext_fields(mut self, ext_fields: HashMap<CheetahString, CheetahString>) -> Self {
300 self.ext_fields = Some(ext_fields);
301 self
302 }
303
304 #[inline]
305 pub fn set_body(mut self, body: impl Into<Bytes>) -> Self {
306 self.body = Some(body.into());
307 self
308 }
309
310 #[inline]
311 pub fn set_body_mut_ref(&mut self, body: impl Into<Bytes>) {
312 self.body = Some(body.into());
313 }
314
315 #[inline]
316 pub fn set_suspended(mut self, suspended: bool) -> Self {
317 self.suspended = suspended;
318 self
319 }
320
321 #[inline]
322 pub fn set_suspended_ref(&mut self, suspended: bool) {
323 self.suspended = suspended;
324 }
325
326 #[inline]
327 pub fn set_serialize_type(mut self, serialize_type: SerializeType) -> Self {
328 self.serialize_type = serialize_type;
329 self
330 }
331
332 #[inline]
333 pub fn mark_response_type(mut self) -> Self {
334 let mark = 1 << Self::RPC_TYPE;
335 self.flag |= mark;
336 self
337 }
338
339 #[inline]
340 pub fn mark_response_type_ref(&mut self) {
341 let mark = 1 << Self::RPC_TYPE;
342 self.flag |= mark;
343 }
344
345 #[inline]
346 pub fn mark_oneway_rpc(mut self) -> Self {
347 let mark = 1 << Self::RPC_ONEWAY;
348 self.flag |= mark;
349 self
350 }
351
352 #[inline]
353 pub fn mark_oneway_rpc_ref(&mut self) {
354 let mark = 1 << Self::RPC_ONEWAY;
355 self.flag |= mark;
356 }
357
358 #[inline]
359 pub fn get_serialize_type(&self) -> SerializeType {
360 self.serialize_type
361 }
362
363 #[inline]
365 pub fn header_encode(&mut self) -> Option<Bytes> {
366 self.make_custom_header_to_net();
367 match self.serialize_type {
368 SerializeType::ROCKETMQ => Some(RocketMQSerializable::rocket_mq_protocol_encode_bytes(self)),
369 SerializeType::JSON => {
370 #[cfg(feature = "simd")]
371 {
372 match simd_json::to_vec(self) {
373 Ok(value) => Some(Bytes::from(value)),
374 Err(e) => {
375 error!("Failed to encode JSON header with simd-json: {}", e);
376 None
377 }
378 }
379 }
380 #[cfg(not(feature = "simd"))]
381 {
382 match serde_json::to_vec(self) {
383 Ok(value) => Some(Bytes::from(value)),
384 Err(e) => {
385 error!("Failed to encode JSON header: {}", e);
386 None
387 }
388 }
389 }
390 }
391 }
392 }
393
394 #[inline]
396 pub fn encode_header(&mut self) -> Option<Bytes> {
397 let body_length = self.body.as_ref().map_or(0, |b| b.len());
398 self.encode_header_with_body_length(body_length)
399 }
400
401 #[inline]
403 pub fn encode_header_with_body_length(&mut self, body_length: usize) -> Option<Bytes> {
404 let header_data = self.header_encode()?;
406 let header_len = header_data.len();
407
408 let frame_header_size = 8;
410 let total_length = 4 + header_len + body_length; let mut result = BytesMut::with_capacity(frame_header_size + header_len);
414
415 result.put_i32(total_length as i32);
417
418 result.put_i32(mark_protocol_type(header_len as i32, self.serialize_type));
420
421 result.put(header_data);
423
424 Some(result.freeze())
425 }
426
427 #[inline]
429 pub fn make_custom_header_to_net(&mut self) {
430 if let Some(header) = &self.command_custom_header {
431 if let Some(header_map) = header.to_map() {
432 match &mut self.ext_fields {
433 None => {
434 self.ext_fields = Some(header_map);
435 }
436 Some(ext) => {
437 for (key, value) in header_map {
439 ext.insert(key, value);
440 }
441 }
442 }
443 }
444 }
445 }
446
447 #[inline]
448 pub fn fast_header_encode(&mut self, dst: &mut BytesMut) {
449 match self.serialize_type {
450 SerializeType::JSON => {
451 self.fast_encode_json(dst);
452 }
453 SerializeType::ROCKETMQ => {
454 self.fast_encode_rocketmq(dst);
455 }
456 }
457 }
458
459 #[inline]
461 fn fast_encode_json(&mut self, dst: &mut BytesMut) {
462 self.make_custom_header_to_net();
463
464 let estimated_header_size = self.estimate_json_header_size();
466 let body_length = self.body.as_ref().map_or(0, |b| b.len());
467
468 dst.reserve(8 + estimated_header_size + body_length);
470
471 #[cfg(feature = "simd")]
473 let encode_result = simd_json::to_vec(self);
474
475 #[cfg(not(feature = "simd"))]
476 let encode_result = serde_json::to_vec(self);
477
478 match encode_result {
479 Ok(header_bytes) => {
480 let header_length = header_bytes.len() as i32;
481 let body_length = body_length as i32;
482 let total_length = 4 + header_length + body_length;
483
484 dst.put_i32(total_length);
486 dst.put_i32(RemotingCommand::mark_serialize_type(header_length, SerializeType::JSON));
487
488 dst.put_slice(&header_bytes);
490 }
491 Err(e) => {
492 error!("Failed to encode JSON header: {}", e);
493 dst.put_i32(4); dst.put_i32(RemotingCommand::mark_serialize_type(0, SerializeType::JSON));
496 }
497 }
498 }
499
500 #[inline]
502 fn fast_encode_rocketmq(&mut self, dst: &mut BytesMut) {
503 let begin_index = dst.len();
504
505 dst.reserve(8);
507 dst.put_i64(0); if let Some(header) = self.command_custom_header_ref() {
511 if !header.support_fast_codec() {
512 self.make_custom_header_to_net();
513 }
514 }
515
516 let header_size = RocketMQSerializable::rocketmq_protocol_encode(self, dst);
518 let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
519
520 let serialize_type = RemotingCommand::mark_serialize_type(header_size as i32, SerializeType::ROCKETMQ);
522
523 let total_length = (header_size as i32 + body_length).to_be_bytes();
525 let serialize_type_bytes = serialize_type.to_be_bytes();
526
527 dst[begin_index..begin_index + 4].copy_from_slice(&total_length);
528 dst[begin_index + 4..begin_index + 8].copy_from_slice(&serialize_type_bytes);
529 }
530
531 #[inline]
534 fn estimate_json_header_size(&self) -> usize {
535 let mut size = 100; if let Some(ref remark) = self.remark {
538 size += remark.len() + 20; }
540
541 if let Some(ref ext) = self.ext_fields {
542 size += ext.iter().map(|(k, v)| k.len() + v.len() + 30).sum::<usize>();
544 }
545
546 size
547 }
548
549 #[inline]
551 pub fn decode(src: &mut BytesMut) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
552 const FRAME_HEADER_SIZE: usize = 4;
553 const SERIALIZE_TYPE_SIZE: usize = 4;
554 const MIN_PAYLOAD_SIZE: usize = SERIALIZE_TYPE_SIZE; let available = src.len();
557
558 if available < FRAME_HEADER_SIZE {
560 return Ok(None);
561 }
562
563 let total_size = i32::from_be_bytes([src[0], src[1], src[2], src[3]]) as usize;
565
566 if total_size > 16 * 1024 * 1024 {
568 return Err(rocketmq_error::RocketMQError::Serialization(
569 rocketmq_error::SerializationError::DecodeFailed {
570 format: "remoting_command",
571 message: format!("Frame size {total_size} exceeds maximum allowed (16MB)"),
572 },
573 ));
574 }
575
576 let full_frame_size = total_size + FRAME_HEADER_SIZE;
578 if available < full_frame_size {
579 return Ok(None);
580 }
581
582 if total_size < MIN_PAYLOAD_SIZE {
584 return Err(rocketmq_error::RocketMQError::Serialization(
585 rocketmq_error::SerializationError::DecodeFailed {
586 format: "remoting_command",
587 message: format!("Invalid total_size {total_size}, minimum required is {MIN_PAYLOAD_SIZE}"),
588 },
589 ));
590 }
591
592 let mut cmd_data = src.split_to(full_frame_size);
594 cmd_data.advance(FRAME_HEADER_SIZE); if cmd_data.remaining() < SERIALIZE_TYPE_SIZE {
598 return Err(rocketmq_error::RocketMQError::Serialization(
599 rocketmq_error::SerializationError::DecodeFailed {
600 format: "remoting_command",
601 message: "Incomplete serialize_type field".to_string(),
602 },
603 ));
604 }
605
606 let ori_header_length = cmd_data.get_i32();
608 let header_length = parse_header_length(ori_header_length);
609
610 if header_length > total_size - SERIALIZE_TYPE_SIZE {
612 return Err(rocketmq_error::RocketMQError::Serialization(
613 rocketmq_error::SerializationError::DecodeFailed {
614 format: "remoting_command",
615 message: format!("Invalid header length {header_length}, total size {total_size}"),
616 },
617 ));
618 }
619
620 let protocol_type = parse_serialize_type(ori_header_length)?;
621
622 let mut header_data = cmd_data.split_to(header_length);
624
625 let mut cmd = RemotingCommand::header_decode(&mut header_data, header_length, protocol_type)?;
627
628 if let Some(ref mut cmd) = cmd {
630 let body_length = total_size - SERIALIZE_TYPE_SIZE - header_length;
631 if body_length > 0 {
632 if cmd_data.remaining() >= body_length {
633 cmd.set_body_mut_ref(cmd_data.split_to(body_length).freeze());
634 } else {
635 return Err(rocketmq_error::RocketMQError::Serialization(
636 rocketmq_error::SerializationError::DecodeFailed {
637 format: "remoting_command",
638 message: format!(
639 "Insufficient body data: expected {body_length}, available {}",
640 cmd_data.remaining()
641 ),
642 },
643 ));
644 }
645 }
646 }
647
648 Ok(cmd)
649 }
650
651 #[inline]
653 pub fn header_decode(
654 src: &mut BytesMut,
655 header_length: usize,
656 type_: SerializeType,
657 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
658 match type_ {
659 SerializeType::JSON => {
660 #[cfg(feature = "simd")]
662 let cmd = {
663 let mut slice = src.split_to(header_length).to_vec();
664 simd_json::from_slice::<RemotingCommand>(&mut slice).map_err(|error| {
665 rocketmq_error::RocketMQError::Serialization(rocketmq_error::SerializationError::DecodeFailed {
666 format: "json",
667 message: format!("SIMD JSON deserialization error: {error}"),
668 })
669 })?
670 };
671
672 #[cfg(not(feature = "simd"))]
673 let cmd = SerdeJsonUtils::from_json_slice::<RemotingCommand>(src).map_err(|error| {
674 rocketmq_error::RocketMQError::Serialization(rocketmq_error::SerializationError::DecodeFailed {
675 format: "json",
676 message: format!("JSON deserialization error: {error}"),
677 })
678 })?;
679
680 Ok(Some(cmd.set_serialize_type(SerializeType::JSON)))
681 }
682 SerializeType::ROCKETMQ => {
683 let cmd = RocketMQSerializable::rocket_mq_protocol_decode(src, header_length)?;
685 Ok(Some(cmd.set_serialize_type(SerializeType::ROCKETMQ)))
686 }
687 }
688 }
689
690 #[inline]
691 pub fn get_body(&self) -> Option<&Bytes> {
692 self.body.as_ref()
693 }
694
695 #[inline]
696 pub fn get_body_mut(&mut self) -> Option<&mut Bytes> {
697 self.body.as_mut()
698 }
699
700 #[inline]
701 pub fn mark_serialize_type(header_length: i32, protocol_type: SerializeType) -> i32 {
702 ((protocol_type.get_code() as i32) << 24) | (header_length & 0x00FFFFFF)
703 }
704
705 #[inline]
706 pub fn code(&self) -> i32 {
707 self.code
708 }
709
710 #[inline]
711 pub fn request_code(&self) -> RequestCode {
712 RequestCode::from(self.code)
713 }
714
715 #[inline]
716 pub fn code_ref(&self) -> &i32 {
717 &self.code
718 }
719
720 #[inline]
721 pub fn language(&self) -> LanguageCode {
722 self.language
723 }
724
725 #[inline]
726 pub fn version(&self) -> i32 {
727 self.version
728 }
729
730 pub fn rocketmq_version(&self) -> RocketMqVersion {
731 RocketMqVersion::from_ordinal(self.version as u32)
732 }
733
734 #[inline]
735 pub fn opaque(&self) -> i32 {
736 self.opaque
737 }
738
739 #[inline]
740 pub fn flag(&self) -> i32 {
741 self.flag
742 }
743
744 #[inline]
745 pub fn remark(&self) -> Option<&CheetahString> {
746 self.remark.as_ref()
747 }
748
749 #[inline]
750 pub fn ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
751 self.ext_fields.as_ref()
752 }
753
754 #[inline]
755 pub fn body(&self) -> Option<&Bytes> {
756 self.body.as_ref()
757 }
758
759 #[inline]
760 pub fn take_body(&mut self) -> Option<Bytes> {
761 self.body.take()
762 }
763
764 #[inline]
765 pub fn suspended(&self) -> bool {
766 self.suspended
767 }
768
769 #[inline]
770 pub fn serialize_type(&self) -> SerializeType {
771 self.serialize_type
772 }
773
774 pub fn decode_command_custom_header<T>(&self) -> rocketmq_error::RocketMQResult<T>
775 where
776 T: FromMap<Target = T, Error = rocketmq_error::RocketMQError>,
777 {
778 match self.ext_fields {
779 None => Err(rocketmq_error::RocketMQError::Serialization(
780 rocketmq_error::SerializationError::DecodeFailed {
781 format: "header",
782 message: "ExtFields is None".to_string(),
783 },
784 )),
785 Some(ref header) => T::from(header),
786 }
787 }
788
789 pub fn decode_command_custom_header_fast<T>(&self) -> rocketmq_error::RocketMQResult<T>
790 where
791 T: FromMap<Target = T, Error = rocketmq_error::RocketMQError>,
792 T: Default + CommandCustomHeader,
793 {
794 match self.ext_fields {
795 None => Err(rocketmq_error::RocketMQError::Serialization(
796 rocketmq_error::SerializationError::DecodeFailed {
797 format: "header",
798 message: "ExtFields is None".to_string(),
799 },
800 )),
801 Some(ref header) => {
802 let mut target = T::default();
803 if target.support_fast_codec() {
804 target.decode_fast(header)?;
805 Ok(target)
806 } else {
807 T::from(header)
808 }
809 }
810 }
811 }
812
813 #[inline]
814 pub fn is_response_type(&self) -> bool {
815 let bits = 1 << Self::RPC_TYPE;
816 (self.flag & bits) == bits
817 }
818
819 #[inline]
820 pub fn is_oneway_rpc(&self) -> bool {
821 let bits = 1 << Self::RPC_ONEWAY;
822 (self.flag & bits) == bits
823 }
824
825 pub fn get_type(&self) -> RemotingCommandType {
826 if self.is_response_type() {
827 RemotingCommandType::RESPONSE
828 } else {
829 RemotingCommandType::REQUEST
830 }
831 }
832
833 #[inline]
834 pub fn with_opaque(&mut self, opaque: i32) -> &mut Self {
835 self.opaque = opaque;
836 self
837 }
838
839 pub fn add_ext_field(&mut self, key: impl Into<CheetahString>, value: impl Into<CheetahString>) -> &mut Self {
840 if let Some(ref mut ext) = self.ext_fields {
841 ext.insert(key.into(), value.into());
842 }
843 self
844 }
845
846 #[inline]
847 pub fn with_code(&mut self, code: impl Into<i32>) -> &mut Self {
848 self.code = code.into();
849 self
850 }
851
852 #[inline]
853 pub fn with_remark(&mut self, remark: impl Into<CheetahString>) -> &mut Self {
854 self.remark = Some(remark.into());
855 self
856 }
857
858 #[inline]
859 pub fn get_ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
860 self.ext_fields.as_ref()
861 }
862
863 pub fn read_custom_header_ref<T>(&self) -> Option<&T>
864 where
865 T: CommandCustomHeader + Sync + Send + 'static,
866 {
867 match self.command_custom_header.as_ref() {
868 None => None,
869 Some(value) => value.as_ref().as_any().downcast_ref::<T>(),
870 }
871 }
872
873 pub fn read_custom_header_ref_unchecked<T>(&self) -> &T
874 where
875 T: CommandCustomHeader + Sync + Send + 'static,
876 {
877 match self.command_custom_header.as_ref() {
878 None => unsafe { hint::unreachable_unchecked() },
879 Some(value) => value.as_ref().as_any().downcast_ref::<T>().unwrap(),
880 }
881 }
882
883 pub fn read_custom_header_mut<T>(&mut self) -> Option<&mut T>
884 where
885 T: CommandCustomHeader + Sync + Send + 'static,
886 {
887 match self.command_custom_header.as_mut() {
888 None => None,
889 Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>(),
890 }
891 }
892
893 pub fn read_custom_header_mut_from_ref<T>(&self) -> Option<&mut T>
894 where
895 T: CommandCustomHeader + Sync + Send + 'static,
896 {
897 match self.command_custom_header.as_ref() {
898 None => None,
899 Some(value) => value.mut_from_ref().as_any_mut().downcast_mut::<T>(),
900 }
901 }
902
903 pub fn read_custom_header_mut_unchecked<T>(&mut self) -> &mut T
904 where
905 T: CommandCustomHeader + Sync + Send + 'static,
906 {
907 match self.command_custom_header.as_mut() {
908 None => unsafe { hint::unreachable_unchecked() },
909 Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>().unwrap(),
910 }
911 }
912
913 pub fn command_custom_header_ref(&self) -> Option<&dyn CommandCustomHeader> {
914 match self.command_custom_header.as_ref() {
915 None => None,
916 Some(value) => Some(value.as_ref().as_ref()),
917 }
918 }
919
920 pub fn command_custom_header_mut(&mut self) -> Option<&mut dyn CommandCustomHeader> {
921 match self.command_custom_header.as_mut() {
922 None => None,
923 Some(value) => Some(value.as_mut().as_mut()),
924 }
925 }
926
927 pub fn create_new_request_id() -> i32 {
928 REQUEST_ID.fetch_add(1, Ordering::AcqRel)
929 }
930
931 #[inline]
932 pub fn add_ext_field_if_not_exist(&mut self, key: impl Into<CheetahString>, value: impl Into<CheetahString>) {
933 if let Some(ref mut ext) = self.ext_fields {
934 ext.entry(key.into()).or_insert(value.into());
935 }
936 }
937
938 #[inline]
943 pub fn ensure_ext_fields_initialized(&mut self) {
944 if self.ext_fields.is_none() {
945 self.ext_fields = Some(std::collections::HashMap::new());
946 }
947 }
948}
949
950#[inline]
952pub fn parse_header_length(size: i32) -> usize {
953 (size & 0x00FFFFFF) as usize
954}
955
956#[inline]
958pub fn mark_protocol_type(source: i32, serialize_type: SerializeType) -> i32 {
959 ((serialize_type.get_code() as i32) << 24) | (source & 0x00FFFFFF)
960}
961
962#[inline]
964pub fn parse_serialize_type(size: i32) -> rocketmq_error::RocketMQResult<SerializeType> {
965 let code = (size >> 24) as u8;
966 SerializeType::value_of(code).ok_or({
967 rocketmq_error::RocketMQError::Protocol(rocketmq_error::ProtocolError::UnsupportedSerializationType {
968 serialize_type: code,
969 })
970 })
971}
972
973impl AsRef<RemotingCommand> for RemotingCommand {
974 #[inline]
975 fn as_ref(&self) -> &RemotingCommand {
976 self
977 }
978}
979
980impl AsMut<RemotingCommand> for RemotingCommand {
981 #[inline]
982 fn as_mut(&mut self) -> &mut RemotingCommand {
983 self
984 }
985}
986
987#[cfg(test)]
988mod tests {
989 use super::*;
990
991 #[test]
992 fn test_remoting_command() {
993 let command = RemotingCommand::create_remoting_command(1)
994 .set_code(1)
995 .set_language(LanguageCode::JAVA)
996 .set_opaque(1)
997 .set_flag(1)
998 .set_ext_fields(HashMap::new())
999 .set_remark_option(Some("remark".to_string()));
1000
1001 assert_eq!(
1002 "{\"code\":1,\"language\":\"JAVA\",\"version\":0,\"opaque\":1,\"flag\":1,\"remark\":\"remark\",\"\
1003 extFields\":{},\"serializeTypeCurrentRPC\":\"JSON\"}",
1004 serde_json::to_string(&command).unwrap()
1005 );
1006 }
1007
1008 #[test]
1009 fn test_mark_serialize_type() {
1010 let i = RemotingCommand::mark_serialize_type(261, SerializeType::JSON);
1011 assert_eq!(i, 261);
1012
1013 let i = RemotingCommand::mark_serialize_type(16777215, SerializeType::JSON);
1014 assert_eq!(i, 16777215);
1015
1016 println!("i={}", RemotingCommand::default().opaque);
1017 println!("i={}", RemotingCommand::default().opaque);
1018 println!("i={}", RemotingCommand::default().opaque);
1019 println!("i={}", RemotingCommand::default().opaque);
1020 }
1021}