rocketmq_remoting/rpc/
rpc_client_impl.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::any::Any;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::message::message_queue::MessageQueue;
21use rocketmq_error::RocketmqError::RpcError;
22use rocketmq_rust::ArcMut;
23
24use crate::clients::rocketmq_tokio_client::RocketmqDefaultClient;
25use crate::clients::RemotingClient;
26use crate::code::request_code::RequestCode;
27use crate::code::response_code::ResponseCode;
28use crate::protocol::command_custom_header::CommandCustomHeader;
29use crate::protocol::header::get_earliest_msg_storetime_response_header::GetEarliestMsgStoretimeResponseHeader;
30use crate::protocol::header::get_max_offset_response_header::GetMaxOffsetResponseHeader;
31use crate::protocol::header::get_min_offset_response_header::GetMinOffsetResponseHeader;
32use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
33use crate::protocol::header::pull_message_response_header::PullMessageResponseHeader;
34use crate::protocol::header::query_consumer_offset_response_header::QueryConsumerOffsetResponseHeader;
35use crate::protocol::header::search_offset_response_header::SearchOffsetResponseHeader;
36use crate::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetResponseHeader;
37use crate::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
38use crate::rpc::client_metadata::ClientMetadata;
39use crate::rpc::rpc_client::RpcClient;
40use crate::rpc::rpc_client_hook::RpcClientHookFn;
41use crate::rpc::rpc_client_utils::RpcClientUtils;
42use crate::rpc::rpc_request::RpcRequest;
43use crate::rpc::rpc_response::RpcResponse;
44
45pub struct RpcClientImpl {
46    client_metadata: ClientMetadata,
47    remoting_client: ArcMut<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
48    client_hook_list: Vec<RpcClientHookFn>,
49}
50
51impl RpcClientImpl {
52    pub fn new(
53        client_metadata: ClientMetadata,
54        remoting_client: ArcMut<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
55    ) -> Self {
56        RpcClientImpl {
57            client_metadata,
58            remoting_client,
59            client_hook_list: Vec::new(),
60        }
61    }
62
63    pub fn register_client_hook(&mut self, client_hook: RpcClientHookFn) {
64        self.client_hook_list.push(client_hook);
65    }
66
67    pub fn clear_client_hook(&mut self) {
68        self.client_hook_list.clear();
69    }
70
71    fn get_broker_addr_by_name_or_exception(
72        &self,
73        broker_name: &str,
74    ) -> rocketmq_error::RocketMQResult<CheetahString> {
75        match self.client_metadata.find_master_broker_addr(broker_name) {
76            None => Err(RpcError(
77                From::from(ResponseCode::SystemError),
78                format!("cannot find addr for broker {broker_name}"),
79            )
80            .into()),
81            Some(value) => Ok(value),
82        }
83    }
84
85    async fn handle_pull_message<H: CommandCustomHeader + TopicRequestHeaderTrait>(
86        &self,
87        addr: &CheetahString,
88        request: RpcRequest<H>,
89        timeout_millis: u64,
90    ) -> rocketmq_error::RocketMQResult<RpcResponse> {
91        let request_command = RpcClientUtils::create_command_for_rpc_request(request);
92        match self
93            .remoting_client
94            .invoke_request(Some(addr), request_command, timeout_millis)
95            .await
96        {
97            Ok(response) => match ResponseCode::from(response.code()) {
98                ResponseCode::Success
99                | ResponseCode::PullNotFound
100                | ResponseCode::PullRetryImmediately
101                | ResponseCode::PullOffsetMoved => {
102                    let response_header =
103                        response.decode_command_custom_header::<PullMessageResponseHeader>()?;
104                    let body = response
105                        .body()
106                        .map(|value| Box::new(value.clone()) as Box<dyn Any>);
107                    let rpc_response =
108                        RpcResponse::new(response.code(), Box::new(response_header), body);
109                    Ok(rpc_response)
110                }
111                _ => Ok(RpcResponse::new_exception(Some(RpcError(
112                    response.code(),
113                    "unexpected remote response code".to_string(),
114                )))),
115            },
116            Err(_error) => Err(RpcError(
117                From::from(ResponseCode::SystemError),
118                format!("process failed. addr: {addr}. Request"),
119            )
120            .into()),
121        }
122    }
123
124    async fn handle_get_min_offset<H: CommandCustomHeader + TopicRequestHeaderTrait>(
125        &self,
126        addr: &CheetahString,
127        request: RpcRequest<H>,
128        timeout_millis: u64,
129    ) -> rocketmq_error::RocketMQResult<RpcResponse> {
130        let request_command = RpcClientUtils::create_command_for_rpc_request(request);
131        match self
132            .remoting_client
133            .invoke_request(Some(addr), request_command, timeout_millis)
134            .await
135        {
136            Ok(response) => match ResponseCode::from(response.code()) {
137                ResponseCode::Success => {
138                    let response_header =
139                        response.decode_command_custom_header::<GetMinOffsetResponseHeader>()?;
140                    let body = response
141                        .body()
142                        .map(|value| Box::new(value.clone()) as Box<dyn Any>);
143                    let rpc_response =
144                        RpcResponse::new(response.code(), Box::new(response_header), body);
145                    Ok(rpc_response)
146                }
147                _ => Ok(RpcResponse::new_exception(Some(RpcError(
148                    response.code(),
149                    "unknown remote error".to_string(),
150                )))),
151            },
152            Err(_error) => Err(RpcError(
153                From::from(ResponseCode::SystemError),
154                format!("process failed. addr: {addr}. Request"),
155            )
156            .into()),
157        }
158    }
159    async fn handle_get_max_offset<H: CommandCustomHeader + TopicRequestHeaderTrait>(
160        &self,
161        addr: &CheetahString,
162        request: RpcRequest<H>,
163        timeout_millis: u64,
164    ) -> rocketmq_error::RocketMQResult<RpcResponse> {
165        let request_command = RpcClientUtils::create_command_for_rpc_request(request);
166        match self
167            .remoting_client
168            .invoke_request(Some(addr), request_command, timeout_millis)
169            .await
170        {
171            Ok(response) => match ResponseCode::from(response.code()) {
172                ResponseCode::Success => {
173                    let response_header =
174                        response.decode_command_custom_header::<GetMaxOffsetResponseHeader>()?;
175                    let body = response
176                        .body()
177                        .map(|value| Box::new(value.clone()) as Box<dyn Any>);
178                    let rpc_response =
179                        RpcResponse::new(response.code(), Box::new(response_header), body);
180                    Ok(rpc_response)
181                }
182                _ => Ok(RpcResponse::new_exception(Some(RpcError(
183                    response.code(),
184                    "unknown remote error".to_string(),
185                )))),
186            },
187            Err(_error) => Err(RpcError(
188                From::from(ResponseCode::SystemError),
189                format!("process failed. addr: {addr}. Request"),
190            )
191            .into()),
192        }
193    }
194    async fn handle_search_offset<H: CommandCustomHeader + TopicRequestHeaderTrait>(
195        &self,
196        addr: &CheetahString,
197        request: RpcRequest<H>,
198        timeout_millis: u64,
199    ) -> rocketmq_error::RocketMQResult<RpcResponse> {
200        let request_command = RpcClientUtils::create_command_for_rpc_request(request);
201        match self
202            .remoting_client
203            .invoke_request(Some(addr), request_command, timeout_millis)
204            .await
205        {
206            Ok(response) => match ResponseCode::from(response.code()) {
207                ResponseCode::Success => {
208                    let response_header =
209                        response.decode_command_custom_header::<SearchOffsetResponseHeader>()?;
210                    let body = response
211                        .body()
212                        .map(|value| Box::new(value.clone()) as Box<dyn Any>);
213                    let rpc_response =
214                        RpcResponse::new(response.code(), Box::new(response_header), body);
215                    Ok(rpc_response)
216                }
217                _ => Ok(RpcResponse::new_exception(Some(RpcError(
218                    response.code(),
219                    "unknown remote error".to_string(),
220                )))),
221            },
222            Err(_error) => Err(RpcError(
223                From::from(ResponseCode::SystemError),
224                format!("process failed. addr: {addr}. Request"),
225            )
226            .into()),
227        }
228    }
229    async fn handle_get_earliest_msg_storetime<H: CommandCustomHeader + TopicRequestHeaderTrait>(
230        &self,
231        addr: &CheetahString,
232        request: RpcRequest<H>,
233        timeout_millis: u64,
234    ) -> rocketmq_error::RocketMQResult<RpcResponse> {
235        let request_command = RpcClientUtils::create_command_for_rpc_request(request);
236        match self
237            .remoting_client
238            .invoke_request(Some(addr), request_command, timeout_millis)
239            .await
240        {
241            Ok(response) => match ResponseCode::from(response.code()) {
242                ResponseCode::Success => {
243                    let response_header = response
244                        .decode_command_custom_header::<GetEarliestMsgStoretimeResponseHeader>()?;
245                    let body = response
246                        .body()
247                        .map(|value| Box::new(value.clone()) as Box<dyn Any>);
248                    let rpc_response =
249                        RpcResponse::new(response.code(), Box::new(response_header), body);
250                    Ok(rpc_response)
251                }
252                _ => Ok(RpcResponse::new_exception(Some(RpcError(
253                    response.code(),
254                    "unknown remote error".to_string(),
255                )))),
256            },
257            Err(_error) => Err(RpcError(
258                From::from(ResponseCode::SystemError),
259                format!("process failed. addr: {addr}. Request"),
260            )
261            .into()),
262        }
263    }
264    async fn handle_query_consumer_offset<H: CommandCustomHeader + TopicRequestHeaderTrait>(
265        &self,
266        addr: &CheetahString,
267        request: RpcRequest<H>,
268        timeout_millis: u64,
269    ) -> rocketmq_error::RocketMQResult<RpcResponse> {
270        let request_command = RpcClientUtils::create_command_for_rpc_request(request);
271        match self
272            .remoting_client
273            .invoke_request(Some(addr), request_command, timeout_millis)
274            .await
275        {
276            Ok(response) => match ResponseCode::from(response.code()) {
277                ResponseCode::Success => {
278                    let response_header = response
279                        .decode_command_custom_header::<QueryConsumerOffsetResponseHeader>()?;
280                    let body = response
281                        .body()
282                        .map(|value| Box::new(value.clone()) as Box<dyn Any>);
283                    let rpc_response =
284                        RpcResponse::new(response.code(), Box::new(response_header), body);
285                    Ok(rpc_response)
286                }
287                ResponseCode::QueryNotFound => {
288                    let rpc_response = RpcResponse::new_option(response.code(), None);
289                    Ok(rpc_response)
290                }
291                _ => Ok(RpcResponse::new_exception(Some(RpcError(
292                    response.code(),
293                    "unknown remote error".to_string(),
294                )))),
295            },
296            Err(_error) => Err(RpcError(
297                From::from(ResponseCode::SystemError),
298                format!("process failed. addr: {addr}. Request"),
299            )
300            .into()),
301        }
302    }
303    async fn handle_update_consumer_offset<H: CommandCustomHeader + TopicRequestHeaderTrait>(
304        &self,
305        addr: &CheetahString,
306        request: RpcRequest<H>,
307        timeout_millis: u64,
308    ) -> rocketmq_error::RocketMQResult<RpcResponse> {
309        let request_command = RpcClientUtils::create_command_for_rpc_request(request);
310        match self
311            .remoting_client
312            .invoke_request(Some(addr), request_command, timeout_millis)
313            .await
314        {
315            Ok(response) => match ResponseCode::from(response.code()) {
316                ResponseCode::Success => {
317                    let response_header = response
318                        .decode_command_custom_header::<UpdateConsumerOffsetResponseHeader>()?;
319                    let body = response
320                        .body()
321                        .map(|value| Box::new(value.clone()) as Box<dyn Any>);
322                    let rpc_response =
323                        RpcResponse::new(response.code(), Box::new(response_header), body);
324                    Ok(rpc_response)
325                }
326                _ => Ok(RpcResponse::new_exception(Some(RpcError(
327                    response.code(),
328                    "unknown remote error".to_string(),
329                )))),
330            },
331            Err(_error) => Err(RpcError(
332                From::from(ResponseCode::SystemError),
333                format!("process failed. addr: {addr}. Request"),
334            )
335            .into()),
336        }
337    }
338    async fn handle_common_body_request<H: CommandCustomHeader + TopicRequestHeaderTrait>(
339        &self,
340        addr: &CheetahString,
341        request: RpcRequest<H>,
342        timeout_millis: u64,
343    ) -> rocketmq_error::RocketMQResult<RpcResponse> {
344        let request_command = RpcClientUtils::create_command_for_rpc_request(request);
345        match self
346            .remoting_client
347            .invoke_request(Some(addr), request_command, timeout_millis)
348            .await
349        {
350            Ok(response) => match ResponseCode::from(response.code()) {
351                ResponseCode::Success => {
352                    let body = response
353                        .body()
354                        .map(|value| Box::new(value.clone()) as Box<dyn Any>);
355                    let rpc_response = RpcResponse::new_option(response.code(), body);
356                    Ok(rpc_response)
357                }
358                _ => Ok(RpcResponse::new_exception(Some(RpcError(
359                    response.code(),
360                    "unknown remote error".to_string(),
361                )))),
362            },
363            Err(_error) => Err(RpcError(
364                From::from(ResponseCode::SystemError),
365                format!("process failed. addr: {addr}. Request"),
366            )
367            .into()),
368        }
369    }
370}
371
372impl RpcClient for RpcClientImpl {
373    async fn invoke<H: CommandCustomHeader + TopicRequestHeaderTrait>(
374        &self,
375        request: RpcRequest<H>,
376        timeout_millis: u64,
377    ) -> rocketmq_error::RocketMQResult<RpcResponse> {
378        if !self.client_hook_list.is_empty() {
379            for hook in self.client_hook_list.iter() {
380                // let result = hook.before_request(&request)?;
381                let result = hook(Some(&request.header), None)?;
382                if let Some(result) = result {
383                    return Ok(result);
384                }
385            }
386        }
387        let bname = request
388            .header
389            .broker_name()
390            .expect("broker name is required");
391        let addr = self.get_broker_addr_by_name_or_exception(bname.as_ref())?;
392        let result = match RequestCode::from(request.code) {
393            RequestCode::PullMessage => {
394                self.handle_pull_message(&addr, request, timeout_millis)
395                    .await?
396            }
397            RequestCode::GetMinOffset => {
398                self.handle_get_min_offset(&addr, request, timeout_millis)
399                    .await?
400            }
401            RequestCode::GetMaxOffset => {
402                self.handle_get_max_offset(&addr, request, timeout_millis)
403                    .await?
404            }
405            RequestCode::SearchOffsetByTimestamp => {
406                self.handle_search_offset(&addr, request, timeout_millis)
407                    .await?
408            }
409            RequestCode::GetEarliestMsgStoreTime => {
410                self.handle_get_earliest_msg_storetime(&addr, request, timeout_millis)
411                    .await?
412            }
413            RequestCode::QueryConsumerOffset => {
414                self.handle_query_consumer_offset(&addr, request, timeout_millis)
415                    .await?
416            }
417            RequestCode::UpdateConsumerOffset => {
418                self.handle_update_consumer_offset(&addr, request, timeout_millis)
419                    .await?
420            }
421            RequestCode::GetTopicStatsInfo => {
422                self.handle_common_body_request(&addr, request, timeout_millis)
423                    .await?
424            }
425            RequestCode::GetTopicConfig => {
426                self.handle_common_body_request(&addr, request, timeout_millis)
427                    .await?
428            }
429            _ => {
430                return Err(RpcError(
431                    From::from(ResponseCode::RequestCodeNotSupported),
432                    format!("unknown request code {}", request.code),
433                )
434                .into())
435            }
436        };
437        Ok(result)
438    }
439
440    async fn invoke_mq<H: CommandCustomHeader + TopicRequestHeaderTrait>(
441        &self,
442        mq: MessageQueue,
443        mut request: RpcRequest<H>,
444        timeout_millis: u64,
445    ) -> rocketmq_error::RocketMQResult<RpcResponse> {
446        if let Some(broker_name) = self.client_metadata.get_broker_name_from_message_queue(&mq) {
447            request.header.set_broker_name(broker_name);
448        }
449        self.invoke(request, timeout_millis).await
450    }
451}