rocketmq_remoting/rpc/
rpc_client_utils.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::any::Any;
19
20use bytes::Bytes;
21use bytes::BytesMut;
22
23use crate::protocol::command_custom_header::CommandCustomHeader;
24use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
25use crate::protocol::remoting_command::RemotingCommand;
26use crate::protocol::RemotingSerializable;
27use crate::rpc::rpc_request::RpcRequest;
28use crate::rpc::rpc_response::RpcResponse;
29
30pub struct RpcClientUtils;
31
32impl RpcClientUtils {
33    pub fn create_command_for_rpc_request<H: CommandCustomHeader + TopicRequestHeaderTrait>(
34        rpc_request: RpcRequest<H>,
35    ) -> RemotingCommand {
36        let result = RemotingCommand::create_request_command(rpc_request.code, rpc_request.header);
37        if let Some(body) = rpc_request.body {
38            if let Some(body) = Self::encode_body(&*body) {
39                return result.set_body(body);
40            }
41        }
42        result
43    }
44
45    pub fn create_command_for_rpc_response(mut rpc_response: RpcResponse) -> RemotingCommand {
46        let mut cmd = match rpc_response.header.take() {
47            None => RemotingCommand::create_response_command_with_code(rpc_response.code),
48            Some(value) => RemotingCommand::create_response_command()
49                .set_command_custom_header_origin(Some(value)),
50        };
51        match rpc_response.exception {
52            None => {}
53            Some(value) => cmd.set_remark_mut(value.to_string()),
54        }
55        if let Some(ref _body) = rpc_response.body {
56            return cmd;
57        }
58        cmd
59    }
60
61    pub fn encode_body(body: &dyn Any) -> Option<Bytes> {
62        if body.is::<()>() {
63            None
64        } else if let Some(bytes) = body.downcast_ref::<Bytes>() {
65            Some(bytes.clone())
66        } else if let Some(remoting_serializable) = body.downcast_ref::<&dyn RemotingSerializable>()
67        {
68            Some(Bytes::from(
69                remoting_serializable.encode().expect("encode failed"),
70            ))
71        } else if let Some(buffer) = body.downcast_ref::<BytesMut>() {
72            let data = buffer.clone().freeze();
73            Some(data)
74        } else {
75            None
76        }
77    }
78}