rocketmq_remoting/rpc/
rpc_client_utils.rs1use std::any::Any;
19
20use bytes::Bytes;
21use bytes::BytesMut;
22
23use crate::protocol::command_custom_header::CommandCustomHeader;
24use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
25use crate::protocol::remoting_command::RemotingCommand;
26use crate::protocol::RemotingSerializable;
27use crate::rpc::rpc_request::RpcRequest;
28use crate::rpc::rpc_response::RpcResponse;
29
30pub struct RpcClientUtils;
31
32impl RpcClientUtils {
33 pub fn create_command_for_rpc_request<H: CommandCustomHeader + TopicRequestHeaderTrait>(
34 rpc_request: RpcRequest<H>,
35 ) -> RemotingCommand {
36 let result = RemotingCommand::create_request_command(rpc_request.code, rpc_request.header);
37 if let Some(body) = rpc_request.body {
38 if let Some(body) = Self::encode_body(&*body) {
39 return result.set_body(body);
40 }
41 }
42 result
43 }
44
45 pub fn create_command_for_rpc_response(mut rpc_response: RpcResponse) -> RemotingCommand {
46 let mut cmd = match rpc_response.header.take() {
47 None => RemotingCommand::create_response_command_with_code(rpc_response.code),
48 Some(value) => RemotingCommand::create_response_command()
49 .set_command_custom_header_origin(Some(value)),
50 };
51 match rpc_response.exception {
52 None => {}
53 Some(value) => cmd.set_remark_mut(value.to_string()),
54 }
55 if let Some(ref _body) = rpc_response.body {
56 return cmd;
57 }
58 cmd
59 }
60
61 pub fn encode_body(body: &dyn Any) -> Option<Bytes> {
62 if body.is::<()>() {
63 None
64 } else if let Some(bytes) = body.downcast_ref::<Bytes>() {
65 Some(bytes.clone())
66 } else if let Some(remoting_serializable) = body.downcast_ref::<&dyn RemotingSerializable>()
67 {
68 Some(Bytes::from(
69 remoting_serializable.encode().expect("encode failed"),
70 ))
71 } else if let Some(buffer) = body.downcast_ref::<BytesMut>() {
72 let data = buffer.clone().freeze();
73 Some(data)
74 } else {
75 None
76 }
77 }
78}