1use std::collections::HashMap;
19use std::fmt;
20use std::hint;
21use std::sync::atomic::AtomicI32;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::sync::Once;
25
26use bytes::Buf;
27use bytes::BufMut;
28use bytes::Bytes;
29use bytes::BytesMut;
30use cheetah_string::CheetahString;
31use lazy_static::lazy_static;
32use rocketmq_common::common::mq_version::RocketMqVersion;
33use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
34use rocketmq_common::EnvUtils::EnvUtils;
35use rocketmq_error::RocketmqError;
36use rocketmq_rust::ArcMut;
37use serde::Deserialize;
38use serde::Serialize;
39use tracing::error;
40
41use super::RemotingCommandType;
42use super::RemotingSerializable;
43use super::SerializeType;
44use crate::code::response_code::RemotingSysResponseCode;
45use crate::protocol::command_custom_header::CommandCustomHeader;
46use crate::protocol::command_custom_header::FromMap;
47use crate::protocol::LanguageCode;
48use crate::rocketmq_serializable::RocketMQSerializable;
49
50pub const SERIALIZE_TYPE_PROPERTY: &str = "rocketmq.serialize.type";
51pub const SERIALIZE_TYPE_ENV: &str = "ROCKETMQ_SERIALIZE_TYPE";
52pub const REMOTING_VERSION_KEY: &str = "rocketmq.remoting.version";
53
54lazy_static! {
55 static ref requestId: Arc<AtomicI32> = Arc::new(AtomicI32::new(0));
56 static ref CONFIG_VERSION: i32 = {
57 EnvUtils::get_property(REMOTING_VERSION_KEY)
58 .unwrap_or(String::from("0"))
59 .parse::<i32>()
60 .unwrap_or(0)
61 };
62 static ref INIT: Once = Once::new();
63 pub static ref SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER: SerializeType = {
64 let protocol = std::env::var(SERIALIZE_TYPE_PROPERTY).unwrap_or_else(|_| {
65 std::env::var(SERIALIZE_TYPE_ENV).unwrap_or_else(|_| "".to_string())
66 });
67 match protocol.as_str() {
68 "JSON" => SerializeType::JSON,
69 "ROCKETMQ" => SerializeType::ROCKETMQ,
70 _ => SerializeType::JSON,
71 }
72 };
73}
74
75fn set_cmd_version(cmd: &mut RemotingCommand) {
76 cmd.set_version_ref(*CONFIG_VERSION);
77}
78
79#[derive(Serialize, Deserialize)]
80pub struct RemotingCommand {
81 code: i32,
82 language: LanguageCode,
83 version: i32,
84 opaque: i32,
85
86 flag: i32,
92 remark: Option<CheetahString>,
93
94 #[serde(rename = "extFields")]
95 ext_fields: Option<HashMap<CheetahString, CheetahString>>,
96
97 #[serde(skip)]
98 body: Option<Bytes>,
99 #[serde(skip)]
100 suspended: bool,
101 #[serde(skip)]
102 command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
103 #[serde(rename = "serializeTypeCurrentRPC")]
104 serialize_type: SerializeType,
105}
106
107impl Clone for RemotingCommand {
108 fn clone(&self) -> Self {
109 Self {
110 code: self.code,
111 language: self.language,
112 version: self.version,
113 opaque: self.opaque,
114 flag: self.flag,
115 remark: self.remark.clone(),
116 ext_fields: self.ext_fields.clone(),
117 body: self.body.clone(),
118 suspended: self.suspended,
119 command_custom_header: self.command_custom_header.clone(),
120 serialize_type: self.serialize_type,
121 }
122 }
123}
124
125impl fmt::Display for RemotingCommand {
126 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127 write!(
128 f,
129 "RemotingCommand [code={}, language={}, version={}, opaque={}, flag(B)={:b}, \
130 remark={}, extFields={:?}, serializeTypeCurrentRPC={}]",
131 self.code,
132 self.language,
133 self.version,
134 self.opaque,
135 self.flag,
136 self.remark.as_ref().unwrap_or(&CheetahString::default()),
137 self.ext_fields,
138 self.serialize_type
139 )
140 }
141}
142
143impl Default for RemotingCommand {
144 fn default() -> Self {
145 let opaque = requestId.fetch_add(1, Ordering::AcqRel);
146 RemotingCommand {
147 code: 0,
148 language: LanguageCode::RUST, version: 0,
150 opaque,
151 flag: 0,
152 remark: None,
153 ext_fields: None,
154 body: None,
155 suspended: false,
156 command_custom_header: None,
157 serialize_type: *SERIALIZE_TYPE_CONFIG_IN_THIS_SERVER,
158 }
159 }
160}
161
162impl RemotingCommand {
163 pub(crate) const RPC_ONEWAY: i32 = 1;
164 pub(crate) const RPC_TYPE: i32 = 0;
165}
166
167impl RemotingCommand {
168 pub fn new_request(code: impl Into<i32>, body: impl Into<Bytes>) -> Self {
169 Self::default().set_code(code).set_body(body)
170 }
171
172 pub fn create_request_command<T>(code: impl Into<i32>, header: T) -> Self
173 where
174 T: CommandCustomHeader + Sync + Send + 'static,
175 {
176 let mut command = Self::default()
177 .set_code(code.into())
178 .set_command_custom_header(header);
179 set_cmd_version(&mut command);
180 command
181 }
182
183 pub fn create_remoting_command(code: impl Into<i32>) -> Self {
184 let command = Self::default();
185 command.set_code(code.into())
186 }
187
188 pub fn get_and_add() -> i32 {
189 requestId.fetch_add(1, Ordering::AcqRel)
190 }
191
192 pub fn create_response_command_with_code(code: impl Into<i32>) -> Self {
193 Self::default().set_code(code).mark_response_type()
194 }
195
196 pub fn create_response_command_with_code_remark(
197 code: impl Into<i32>,
198 remark: impl Into<CheetahString>,
199 ) -> Self {
200 Self::default()
201 .set_code(code)
202 .set_remark_option(Some(remark.into()))
203 .mark_response_type()
204 }
205
206 pub fn create_response_command() -> Self {
207 Self::default()
208 .set_code(RemotingSysResponseCode::Success)
209 .mark_response_type()
210 }
211
212 pub fn create_response_command_with_header(
213 header: impl CommandCustomHeader + Sync + Send + 'static,
214 ) -> Self {
215 Self::default()
216 .set_code(RemotingSysResponseCode::Success)
217 .set_command_custom_header(header)
218 .mark_response_type()
219 }
220
221 pub fn set_command_custom_header<T>(mut self, command_custom_header: T) -> Self
222 where
223 T: CommandCustomHeader + Sync + Send + 'static,
224 {
225 self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
226 self
227 }
228
229 pub fn set_command_custom_header_origin(
230 mut self,
231 command_custom_header: Option<ArcMut<Box<dyn CommandCustomHeader + Send + Sync + 'static>>>,
232 ) -> Self {
233 self.command_custom_header = command_custom_header;
234 self
235 }
236
237 pub fn set_command_custom_header_ref<T>(&mut self, command_custom_header: T)
238 where
239 T: CommandCustomHeader + Sync + Send + 'static,
240 {
241 self.command_custom_header = Some(ArcMut::new(Box::new(command_custom_header)));
242 }
243
244 pub fn set_code(mut self, code: impl Into<i32>) -> Self {
245 self.code = code.into();
246 self
247 }
248
249 pub fn set_code_ref(&mut self, code: impl Into<i32>) {
250 self.code = code.into();
251 }
252
253 pub fn set_code_mut(&mut self, code: impl Into<i32>) -> &mut Self {
254 self.code = code.into();
255 self
256 }
257
258 pub fn set_language(mut self, language: LanguageCode) -> Self {
259 self.language = language;
260 self
261 }
262
263 pub fn set_version_ref(&mut self, version: i32) {
264 self.version = version;
265 }
266
267 pub fn set_version(mut self, version: i32) -> Self {
268 self.version = version;
269 self
270 }
271
272 #[inline]
273 pub fn set_opaque(mut self, opaque: i32) -> Self {
274 self.opaque = opaque;
275 self
276 }
277
278 #[inline]
279 pub fn set_opaque_mut(&mut self, opaque: i32) {
280 self.opaque = opaque;
281 }
282
283 #[inline]
284 pub fn set_flag(mut self, flag: i32) -> Self {
285 self.flag = flag;
286 self
287 }
288
289 #[inline]
290 pub fn set_remark_option(mut self, remark: Option<impl Into<CheetahString>>) -> Self {
291 self.remark = remark.map(|item| item.into());
292 self
293 }
294
295 #[inline]
296 pub fn set_remark(mut self, remark: impl Into<CheetahString>) -> Self {
297 self.remark = Some(remark.into());
298 self
299 }
300
301 #[inline]
302 pub fn set_remark_option_mut(&mut self, remark: Option<impl Into<CheetahString>>) {
303 self.remark = remark.map(|item| item.into());
304 }
305
306 #[inline]
307 pub fn set_remark_mut(&mut self, remark: impl Into<CheetahString>) {
308 self.remark = Some(remark.into());
309 }
310
311 #[inline]
312 pub fn set_ext_fields(mut self, ext_fields: HashMap<CheetahString, CheetahString>) -> Self {
313 self.ext_fields = Some(ext_fields);
314 self
315 }
316
317 #[inline]
318 pub fn set_body(mut self, body: impl Into<Bytes>) -> Self {
319 self.body = Some(body.into());
320 self
321 }
322
323 #[inline]
324 pub fn set_body_mut_ref(&mut self, body: impl Into<Bytes>) {
325 self.body = Some(body.into());
326 }
327
328 #[inline]
329 pub fn set_suspended(mut self, suspended: bool) -> Self {
330 self.suspended = suspended;
331 self
332 }
333
334 #[inline]
335 pub fn set_suspended_ref(&mut self, suspended: bool) {
336 self.suspended = suspended;
337 }
338
339 #[inline]
340 pub fn set_serialize_type(mut self, serialize_type: SerializeType) -> Self {
341 self.serialize_type = serialize_type;
342 self
343 }
344
345 #[inline]
346 pub fn mark_response_type(mut self) -> Self {
347 let mark = 1 << Self::RPC_TYPE;
348 self.flag |= mark;
349 self
350 }
351
352 #[inline]
353 pub fn mark_response_type_ref(&mut self) {
354 let mark = 1 << Self::RPC_TYPE;
355 self.flag |= mark;
356 }
357
358 #[inline]
359 pub fn mark_oneway_rpc(mut self) -> Self {
360 let mark = 1 << Self::RPC_ONEWAY;
361 self.flag |= mark;
362 self
363 }
364
365 #[inline]
366 pub fn mark_oneway_rpc_ref(&mut self) {
367 let mark = 1 << Self::RPC_ONEWAY;
368 self.flag |= mark;
369 }
370
371 #[inline]
372 pub fn get_serialize_type(&self) -> SerializeType {
373 self.serialize_type
374 }
375
376 pub fn header_encode(&mut self) -> Option<Bytes> {
377 self.make_custom_header_to_net();
378 if SerializeType::ROCKETMQ == self.serialize_type {
379 Some(RocketMQSerializable::rocket_mq_protocol_encode_bytes(self))
380 } else {
381 match self.encode() {
382 Ok(value) => Some(Bytes::from(value)),
383 Err(e) => {
384 error!("Failed to encode generic: {}", e);
385 None
386 }
387 }
388 }
389 }
390
391 pub fn encode_header(&mut self) -> Option<Bytes> {
392 if let Some(body) = &self.body {
393 let size = body.len();
394 self.encode_header_with_body_length(size)
395 } else {
396 self.encode_header_with_body_length(0)
397 }
398 }
399 pub fn encode_header_with_body_length(&mut self, body_length: usize) -> Option<Bytes> {
400 let mut length = 4;
403
404 let header_data = self.header_encode().unwrap();
406
407 length += header_data.len();
408
409 length += body_length;
411
412 let mut result = BytesMut::with_capacity(4 + length - body_length);
413
414 result.put_i32(length as i32);
416
417 result.put_i32(mark_protocol_type(
419 header_data.len() as i32,
420 self.serialize_type,
421 ));
422
423 result.put(header_data);
425
426 Some(result.freeze())
427 }
428
429 pub fn make_custom_header_to_net(&mut self) {
430 if let Some(header) = &self.command_custom_header {
431 let option = header.to_map();
432
433 match &mut self.ext_fields {
434 None => {
435 self.ext_fields = option;
436 }
437 Some(ext) => {
438 if let Some(val) = option {
439 for (key, value) in &val {
440 ext.insert(key.clone(), value.clone());
441 }
442 }
443 }
444 }
445 }
446 }
447
448 pub fn fast_header_encode(&mut self, dst: &mut BytesMut) {
449 match self.serialize_type {
450 SerializeType::JSON => {
451 self.make_custom_header_to_net();
452 let header = match serde_json::to_vec(self) {
453 Ok(value) => Some(value),
454 Err(e) => {
455 error!("Failed to encode generic: {}", e);
456 None
457 }
458 };
459 let header_length = header.as_ref().map_or(0, |h| h.len()) as i32;
460 let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
461 let total_length = 4 + header_length + body_length;
462
463 dst.reserve((total_length + 4) as usize);
464 dst.put_i32(total_length);
465 let serialize_type =
466 RemotingCommand::mark_serialize_type(header_length, SerializeType::JSON);
467 dst.put_i32(serialize_type);
468
469 if let Some(header_inner) = header {
470 dst.put(header_inner.as_slice());
471 }
472 }
473 SerializeType::ROCKETMQ => {
474 let begin_index = dst.len();
475 dst.put_i64(0);
476 if let Some(header) = self.command_custom_header_ref() {
477 if !header.support_fast_codec() {
478 self.make_custom_header_to_net();
479 }
480 }
481 let header_size = RocketMQSerializable::rocketmq_protocol_encode(self, dst);
482 let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
483 let serialize_type = RemotingCommand::mark_serialize_type(
484 header_size as i32,
485 SerializeType::ROCKETMQ,
486 );
487 dst[begin_index..begin_index + 4]
488 .copy_from_slice(&(header_size as i32 + body_length).to_be_bytes());
489 dst[begin_index + 4..begin_index + 8]
490 .copy_from_slice(&serialize_type.to_be_bytes());
491 }
492 }
493 }
494
495 pub fn decode(src: &mut BytesMut) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
496 let read_to = src.len();
497 if read_to < 4 {
498 return Ok(None);
500 }
501 let total_size = i32::from_be_bytes([src[0], src[1], src[2], src[3]]) as usize;
503
504 if read_to < total_size + 4 {
505 return Ok(None);
507 }
508 let mut cmd_data = src.split_to(total_size + 4);
510 cmd_data.advance(4);
512 if cmd_data.remaining() < 4 {
513 return Ok(None);
514 }
515 let ori_header_length = cmd_data.get_i32();
517 let header_length = parse_header_length(ori_header_length);
518 if header_length > total_size - 4 {
519 return Err(RocketmqError::RemotingCommandDecoderError(format!(
520 "Header length {header_length} is greater than total size {total_size}"
521 )));
522 }
523 let protocol_type = parse_serialize_type(ori_header_length)?;
524 let mut header_data = cmd_data.split_to(header_length);
526
527 let mut cmd =
528 RemotingCommand::header_decode(&mut header_data, header_length, protocol_type)?;
529
530 if let Some(cmd) = cmd.as_mut() {
531 if total_size - 4 > header_length {
532 cmd.set_body_mut_ref(cmd_data.split_to(total_size - 4 - header_length).freeze());
533 }
534 }
535 Ok(cmd)
536 }
537
538 pub fn header_decode(
539 src: &mut BytesMut,
540 header_length: usize,
541 type_: SerializeType,
542 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
543 match type_ {
544 SerializeType::JSON => {
545 let cmd =
546 SerdeJsonUtils::from_json_slice::<RemotingCommand>(src).map_err(|error| {
547 RocketmqError::RemotingCommandDecoderError(format!(
549 "Deserialization error: {error}"
550 ))
551 })?;
552
553 Ok(Some(cmd.set_serialize_type(SerializeType::JSON)))
554 }
555 SerializeType::ROCKETMQ => {
556 let cmd = RocketMQSerializable::rocket_mq_protocol_decode(src, header_length)?;
557 Ok(Some(cmd.set_serialize_type(SerializeType::ROCKETMQ)))
558 }
559 }
560 }
561
562 #[inline]
563 pub fn get_body(&self) -> Option<&Bytes> {
564 self.body.as_ref()
565 }
566
567 #[inline]
568 pub fn get_body_mut(&mut self) -> Option<&mut Bytes> {
569 self.body.as_mut()
570 }
571
572 #[inline]
573 pub fn mark_serialize_type(header_length: i32, protocol_type: SerializeType) -> i32 {
574 ((protocol_type.get_code() as i32) << 24) | (header_length & 0x00FFFFFF)
575 }
576
577 #[inline]
578 pub fn code(&self) -> i32 {
579 self.code
580 }
581
582 #[inline]
583 pub fn language(&self) -> LanguageCode {
584 self.language
585 }
586
587 #[inline]
588 pub fn version(&self) -> i32 {
589 self.version
590 }
591
592 pub fn rocketmq_version(&self) -> RocketMqVersion {
593 RocketMqVersion::from_ordinal(self.version as u32)
594 }
595
596 #[inline]
597 pub fn opaque(&self) -> i32 {
598 self.opaque
599 }
600
601 #[inline]
602 pub fn flag(&self) -> i32 {
603 self.flag
604 }
605
606 #[inline]
607 pub fn remark(&self) -> Option<&CheetahString> {
608 self.remark.as_ref()
609 }
610
611 #[inline]
612 pub fn ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
613 self.ext_fields.as_ref()
614 }
615
616 #[inline]
617 pub fn body(&self) -> &Option<Bytes> {
618 &self.body
619 }
620
621 #[inline]
622 pub fn take_body(&mut self) -> Option<Bytes> {
623 self.body.take()
624 }
625
626 #[inline]
627 pub fn suspended(&self) -> bool {
628 self.suspended
629 }
630
631 #[inline]
632 pub fn serialize_type(&self) -> SerializeType {
633 self.serialize_type
634 }
635
636 pub fn decode_command_custom_header<T>(&self) -> rocketmq_error::RocketMQResult<T>
637 where
638 T: FromMap<Target = T, Error = RocketmqError>,
639 {
640 match self.ext_fields {
641 None => Err(RocketmqError::DeserializeHeaderError(
642 "ExtFields is None".to_string(),
643 )),
644 Some(ref header) => T::from(header),
645 }
646 }
647
648 pub fn decode_command_custom_header_fast<T>(&self) -> rocketmq_error::RocketMQResult<T>
649 where
650 T: FromMap<Target = T, Error = RocketmqError>,
651 T: Default + CommandCustomHeader,
652 {
653 match self.ext_fields {
654 None => Err(RocketmqError::DeserializeHeaderError(
655 "ExtFields is None".to_string(),
656 )),
657 Some(ref header) => {
658 let mut target = T::default();
659 if target.support_fast_codec() {
660 target.decode_fast(header)?;
661 Ok(target)
662 } else {
663 T::from(header)
664 }
665 }
666 }
667 }
668
669 #[inline]
670 pub fn is_response_type(&self) -> bool {
671 let bits = 1 << Self::RPC_TYPE;
672 (self.flag & bits) == bits
673 }
674
675 #[inline]
676 pub fn is_oneway_rpc(&self) -> bool {
677 let bits = 1 << Self::RPC_ONEWAY;
678 (self.flag & bits) == bits
679 }
680
681 pub fn get_type(&self) -> RemotingCommandType {
682 if self.is_response_type() {
683 RemotingCommandType::RESPONSE
684 } else {
685 RemotingCommandType::REQUEST
686 }
687 }
688
689 #[inline]
690 pub fn with_opaque(&mut self, opaque: i32) -> &mut Self {
691 self.opaque = opaque;
692 self
693 }
694
695 pub fn add_ext_field(
696 &mut self,
697 key: impl Into<CheetahString>,
698 value: impl Into<CheetahString>,
699 ) -> &mut Self {
700 if let Some(ref mut ext) = self.ext_fields {
701 ext.insert(key.into(), value.into());
702 }
703 self
704 }
705
706 #[inline]
707 pub fn with_code(&mut self, code: impl Into<i32>) -> &mut Self {
708 self.code = code.into();
709 self
710 }
711
712 #[inline]
713 pub fn with_remark(&mut self, remark: impl Into<CheetahString>) -> &mut Self {
714 self.remark = Some(remark.into());
715 self
716 }
717
718 #[inline]
719 pub fn get_ext_fields(&self) -> Option<&HashMap<CheetahString, CheetahString>> {
720 self.ext_fields.as_ref()
721 }
722
723 pub fn read_custom_header_ref<T>(&self) -> Option<&T>
724 where
725 T: CommandCustomHeader + Sync + Send + 'static,
726 {
727 match self.command_custom_header.as_ref() {
728 None => None,
729 Some(value) => value.as_ref().as_any().downcast_ref::<T>(),
730 }
731 }
732
733 pub fn read_custom_header_ref_unchecked<T>(&self) -> &T
734 where
735 T: CommandCustomHeader + Sync + Send + 'static,
736 {
737 match self.command_custom_header.as_ref() {
738 None => unsafe { hint::unreachable_unchecked() },
739 Some(value) => value.as_ref().as_any().downcast_ref::<T>().unwrap(),
740 }
741 }
742
743 pub fn read_custom_header_mut<T>(&mut self) -> Option<&mut T>
744 where
745 T: CommandCustomHeader + Sync + Send + 'static,
746 {
747 match self.command_custom_header.as_mut() {
748 None => None,
749 Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>(),
750 }
751 }
752
753 pub fn read_custom_header_mut_from_ref<T>(&self) -> Option<&mut T>
754 where
755 T: CommandCustomHeader + Sync + Send + 'static,
756 {
757 match self.command_custom_header.as_ref() {
758 None => None,
759 Some(value) => value.mut_from_ref().as_any_mut().downcast_mut::<T>(),
760 }
761 }
762
763 pub fn read_custom_header_mut_unchecked<T>(&mut self) -> &mut T
764 where
765 T: CommandCustomHeader + Sync + Send + 'static,
766 {
767 match self.command_custom_header.as_mut() {
768 None => unsafe { hint::unreachable_unchecked() },
769 Some(value) => value.as_mut().as_any_mut().downcast_mut::<T>().unwrap(),
770 }
771 }
772
773 pub fn command_custom_header_ref(&self) -> Option<&dyn CommandCustomHeader> {
774 match self.command_custom_header.as_ref() {
775 None => None,
776 Some(value) => Some(value.as_ref().as_ref()),
777 }
778 }
779
780 pub fn command_custom_header_mut(&mut self) -> Option<&mut dyn CommandCustomHeader> {
781 match self.command_custom_header.as_mut() {
782 None => None,
783 Some(value) => Some(value.as_mut().as_mut()),
784 }
785 }
786
787 pub fn create_new_request_id() -> i32 {
788 requestId.fetch_add(1, Ordering::AcqRel)
789 }
790
791 #[inline]
792 pub fn add_ext_field_if_not_exist(
793 &mut self,
794 key: impl Into<CheetahString>,
795 value: impl Into<CheetahString>,
796 ) {
797 if let Some(ref mut ext) = self.ext_fields {
798 ext.entry(key.into()).or_insert(value.into());
799 }
800 }
801}
802
803pub fn parse_header_length(size: i32) -> usize {
804 (size & 0xFFFFFF) as usize
805}
806
807pub fn mark_protocol_type(source: i32, serialize_type: SerializeType) -> i32 {
808 ((serialize_type.get_code() as i32) << 24) | (source & 0x00FFFFFF)
809}
810
811pub fn parse_serialize_type(size: i32) -> rocketmq_error::RocketMQResult<SerializeType> {
812 let code = (size >> 24) as u8;
813 match SerializeType::value_of(code) {
814 None => Err(RocketmqError::NotSupportSerializeType(code)),
815 Some(value) => Ok(value),
816 }
817}
818
819impl AsRef<RemotingCommand> for RemotingCommand {
820 #[inline]
821 fn as_ref(&self) -> &RemotingCommand {
822 self
823 }
824}
825
826impl AsMut<RemotingCommand> for RemotingCommand {
827 #[inline]
828 fn as_mut(&mut self) -> &mut RemotingCommand {
829 self
830 }
831}
832
833#[cfg(test)]
834mod tests {
835 use super::*;
836
837 #[test]
838 fn test_remoting_command() {
839 let command = RemotingCommand::create_remoting_command(1)
840 .set_code(1)
841 .set_language(LanguageCode::JAVA)
842 .set_opaque(1)
843 .set_flag(1)
844 .set_ext_fields(HashMap::new())
845 .set_remark_option(Some("remark".to_string()));
846
847 assert_eq!(
848 "{\"code\":1,\"language\":\"JAVA\",\"version\":0,\"opaque\":1,\"flag\":1,\"remark\":\"\
849 remark\",\"extFields\":{},\"serializeTypeCurrentRPC\":\"JSON\"}",
850 serde_json::to_string(&command).unwrap()
851 );
852 }
853
854 #[test]
855 fn test_mark_serialize_type() {
856 let i = RemotingCommand::mark_serialize_type(261, SerializeType::JSON);
857 assert_eq!(i, 261);
858
859 let i = RemotingCommand::mark_serialize_type(16777215, SerializeType::JSON);
860 assert_eq!(i, 16777215);
861
862 println!("i={}", RemotingCommand::default().opaque);
863 println!("i={}", RemotingCommand::default().opaque);
864 println!("i={}", RemotingCommand::default().opaque);
865 println!("i={}", RemotingCommand::default().opaque);
866 }
867}