1use std::any::Any;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::message::message_queue::MessageQueue;
21use rocketmq_error::RocketmqError::RpcError;
22use rocketmq_rust::ArcMut;
23
24use crate::clients::rocketmq_tokio_client::RocketmqDefaultClient;
25use crate::clients::RemotingClient;
26use crate::code::request_code::RequestCode;
27use crate::code::response_code::ResponseCode;
28use crate::protocol::command_custom_header::CommandCustomHeader;
29use crate::protocol::header::get_earliest_msg_storetime_response_header::GetEarliestMsgStoretimeResponseHeader;
30use crate::protocol::header::get_max_offset_response_header::GetMaxOffsetResponseHeader;
31use crate::protocol::header::get_min_offset_response_header::GetMinOffsetResponseHeader;
32use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
33use crate::protocol::header::pull_message_response_header::PullMessageResponseHeader;
34use crate::protocol::header::query_consumer_offset_response_header::QueryConsumerOffsetResponseHeader;
35use crate::protocol::header::search_offset_response_header::SearchOffsetResponseHeader;
36use crate::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetResponseHeader;
37use crate::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
38use crate::rpc::client_metadata::ClientMetadata;
39use crate::rpc::rpc_client::RpcClient;
40use crate::rpc::rpc_client_hook::RpcClientHookFn;
41use crate::rpc::rpc_client_utils::RpcClientUtils;
42use crate::rpc::rpc_request::RpcRequest;
43use crate::rpc::rpc_response::RpcResponse;
44
45pub struct RpcClientImpl {
46 client_metadata: ClientMetadata,
47 remoting_client: ArcMut<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
48 client_hook_list: Vec<RpcClientHookFn>,
49}
50
51impl RpcClientImpl {
52 pub fn new(
53 client_metadata: ClientMetadata,
54 remoting_client: ArcMut<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
55 ) -> Self {
56 RpcClientImpl {
57 client_metadata,
58 remoting_client,
59 client_hook_list: Vec::new(),
60 }
61 }
62
63 pub fn register_client_hook(&mut self, client_hook: RpcClientHookFn) {
64 self.client_hook_list.push(client_hook);
65 }
66
67 pub fn clear_client_hook(&mut self) {
68 self.client_hook_list.clear();
69 }
70
71 fn get_broker_addr_by_name_or_exception(
72 &self,
73 broker_name: &str,
74 ) -> rocketmq_error::RocketMQResult<CheetahString> {
75 match self.client_metadata.find_master_broker_addr(broker_name) {
76 None => Err(RpcError(
77 From::from(ResponseCode::SystemError),
78 format!("cannot find addr for broker {broker_name}"),
79 )
80 .into()),
81 Some(value) => Ok(value),
82 }
83 }
84
85 async fn handle_pull_message<H: CommandCustomHeader + TopicRequestHeaderTrait>(
86 &self,
87 addr: &CheetahString,
88 request: RpcRequest<H>,
89 timeout_millis: u64,
90 ) -> rocketmq_error::RocketMQResult<RpcResponse> {
91 let request_command = RpcClientUtils::create_command_for_rpc_request(request);
92 match self
93 .remoting_client
94 .invoke_request(Some(addr), request_command, timeout_millis)
95 .await
96 {
97 Ok(response) => match ResponseCode::from(response.code()) {
98 ResponseCode::Success
99 | ResponseCode::PullNotFound
100 | ResponseCode::PullRetryImmediately
101 | ResponseCode::PullOffsetMoved => {
102 let response_header =
103 response.decode_command_custom_header::<PullMessageResponseHeader>()?;
104 let body = response
105 .body()
106 .map(|value| Box::new(value.clone()) as Box<dyn Any>);
107 let rpc_response =
108 RpcResponse::new(response.code(), Box::new(response_header), body);
109 Ok(rpc_response)
110 }
111 _ => Ok(RpcResponse::new_exception(Some(RpcError(
112 response.code(),
113 "unexpected remote response code".to_string(),
114 )))),
115 },
116 Err(_error) => Err(RpcError(
117 From::from(ResponseCode::SystemError),
118 format!("process failed. addr: {addr}. Request"),
119 )
120 .into()),
121 }
122 }
123
124 async fn handle_get_min_offset<H: CommandCustomHeader + TopicRequestHeaderTrait>(
125 &self,
126 addr: &CheetahString,
127 request: RpcRequest<H>,
128 timeout_millis: u64,
129 ) -> rocketmq_error::RocketMQResult<RpcResponse> {
130 let request_command = RpcClientUtils::create_command_for_rpc_request(request);
131 match self
132 .remoting_client
133 .invoke_request(Some(addr), request_command, timeout_millis)
134 .await
135 {
136 Ok(response) => match ResponseCode::from(response.code()) {
137 ResponseCode::Success => {
138 let response_header =
139 response.decode_command_custom_header::<GetMinOffsetResponseHeader>()?;
140 let body = response
141 .body()
142 .map(|value| Box::new(value.clone()) as Box<dyn Any>);
143 let rpc_response =
144 RpcResponse::new(response.code(), Box::new(response_header), body);
145 Ok(rpc_response)
146 }
147 _ => Ok(RpcResponse::new_exception(Some(RpcError(
148 response.code(),
149 "unknown remote error".to_string(),
150 )))),
151 },
152 Err(_error) => Err(RpcError(
153 From::from(ResponseCode::SystemError),
154 format!("process failed. addr: {addr}. Request"),
155 )
156 .into()),
157 }
158 }
159 async fn handle_get_max_offset<H: CommandCustomHeader + TopicRequestHeaderTrait>(
160 &self,
161 addr: &CheetahString,
162 request: RpcRequest<H>,
163 timeout_millis: u64,
164 ) -> rocketmq_error::RocketMQResult<RpcResponse> {
165 let request_command = RpcClientUtils::create_command_for_rpc_request(request);
166 match self
167 .remoting_client
168 .invoke_request(Some(addr), request_command, timeout_millis)
169 .await
170 {
171 Ok(response) => match ResponseCode::from(response.code()) {
172 ResponseCode::Success => {
173 let response_header =
174 response.decode_command_custom_header::<GetMaxOffsetResponseHeader>()?;
175 let body = response
176 .body()
177 .map(|value| Box::new(value.clone()) as Box<dyn Any>);
178 let rpc_response =
179 RpcResponse::new(response.code(), Box::new(response_header), body);
180 Ok(rpc_response)
181 }
182 _ => Ok(RpcResponse::new_exception(Some(RpcError(
183 response.code(),
184 "unknown remote error".to_string(),
185 )))),
186 },
187 Err(_error) => Err(RpcError(
188 From::from(ResponseCode::SystemError),
189 format!("process failed. addr: {addr}. Request"),
190 )
191 .into()),
192 }
193 }
194 async fn handle_search_offset<H: CommandCustomHeader + TopicRequestHeaderTrait>(
195 &self,
196 addr: &CheetahString,
197 request: RpcRequest<H>,
198 timeout_millis: u64,
199 ) -> rocketmq_error::RocketMQResult<RpcResponse> {
200 let request_command = RpcClientUtils::create_command_for_rpc_request(request);
201 match self
202 .remoting_client
203 .invoke_request(Some(addr), request_command, timeout_millis)
204 .await
205 {
206 Ok(response) => match ResponseCode::from(response.code()) {
207 ResponseCode::Success => {
208 let response_header =
209 response.decode_command_custom_header::<SearchOffsetResponseHeader>()?;
210 let body = response
211 .body()
212 .map(|value| Box::new(value.clone()) as Box<dyn Any>);
213 let rpc_response =
214 RpcResponse::new(response.code(), Box::new(response_header), body);
215 Ok(rpc_response)
216 }
217 _ => Ok(RpcResponse::new_exception(Some(RpcError(
218 response.code(),
219 "unknown remote error".to_string(),
220 )))),
221 },
222 Err(_error) => Err(RpcError(
223 From::from(ResponseCode::SystemError),
224 format!("process failed. addr: {addr}. Request"),
225 )
226 .into()),
227 }
228 }
229 async fn handle_get_earliest_msg_storetime<H: CommandCustomHeader + TopicRequestHeaderTrait>(
230 &self,
231 addr: &CheetahString,
232 request: RpcRequest<H>,
233 timeout_millis: u64,
234 ) -> rocketmq_error::RocketMQResult<RpcResponse> {
235 let request_command = RpcClientUtils::create_command_for_rpc_request(request);
236 match self
237 .remoting_client
238 .invoke_request(Some(addr), request_command, timeout_millis)
239 .await
240 {
241 Ok(response) => match ResponseCode::from(response.code()) {
242 ResponseCode::Success => {
243 let response_header = response
244 .decode_command_custom_header::<GetEarliestMsgStoretimeResponseHeader>()?;
245 let body = response
246 .body()
247 .map(|value| Box::new(value.clone()) as Box<dyn Any>);
248 let rpc_response =
249 RpcResponse::new(response.code(), Box::new(response_header), body);
250 Ok(rpc_response)
251 }
252 _ => Ok(RpcResponse::new_exception(Some(RpcError(
253 response.code(),
254 "unknown remote error".to_string(),
255 )))),
256 },
257 Err(_error) => Err(RpcError(
258 From::from(ResponseCode::SystemError),
259 format!("process failed. addr: {addr}. Request"),
260 )
261 .into()),
262 }
263 }
264 async fn handle_query_consumer_offset<H: CommandCustomHeader + TopicRequestHeaderTrait>(
265 &self,
266 addr: &CheetahString,
267 request: RpcRequest<H>,
268 timeout_millis: u64,
269 ) -> rocketmq_error::RocketMQResult<RpcResponse> {
270 let request_command = RpcClientUtils::create_command_for_rpc_request(request);
271 match self
272 .remoting_client
273 .invoke_request(Some(addr), request_command, timeout_millis)
274 .await
275 {
276 Ok(response) => match ResponseCode::from(response.code()) {
277 ResponseCode::Success => {
278 let response_header = response
279 .decode_command_custom_header::<QueryConsumerOffsetResponseHeader>()?;
280 let body = response
281 .body()
282 .map(|value| Box::new(value.clone()) as Box<dyn Any>);
283 let rpc_response =
284 RpcResponse::new(response.code(), Box::new(response_header), body);
285 Ok(rpc_response)
286 }
287 ResponseCode::QueryNotFound => {
288 let rpc_response = RpcResponse::new_option(response.code(), None);
289 Ok(rpc_response)
290 }
291 _ => Ok(RpcResponse::new_exception(Some(RpcError(
292 response.code(),
293 "unknown remote error".to_string(),
294 )))),
295 },
296 Err(_error) => Err(RpcError(
297 From::from(ResponseCode::SystemError),
298 format!("process failed. addr: {addr}. Request"),
299 )
300 .into()),
301 }
302 }
303 async fn handle_update_consumer_offset<H: CommandCustomHeader + TopicRequestHeaderTrait>(
304 &self,
305 addr: &CheetahString,
306 request: RpcRequest<H>,
307 timeout_millis: u64,
308 ) -> rocketmq_error::RocketMQResult<RpcResponse> {
309 let request_command = RpcClientUtils::create_command_for_rpc_request(request);
310 match self
311 .remoting_client
312 .invoke_request(Some(addr), request_command, timeout_millis)
313 .await
314 {
315 Ok(response) => match ResponseCode::from(response.code()) {
316 ResponseCode::Success => {
317 let response_header = response
318 .decode_command_custom_header::<UpdateConsumerOffsetResponseHeader>()?;
319 let body = response
320 .body()
321 .map(|value| Box::new(value.clone()) as Box<dyn Any>);
322 let rpc_response =
323 RpcResponse::new(response.code(), Box::new(response_header), body);
324 Ok(rpc_response)
325 }
326 _ => Ok(RpcResponse::new_exception(Some(RpcError(
327 response.code(),
328 "unknown remote error".to_string(),
329 )))),
330 },
331 Err(_error) => Err(RpcError(
332 From::from(ResponseCode::SystemError),
333 format!("process failed. addr: {addr}. Request"),
334 )
335 .into()),
336 }
337 }
338 async fn handle_common_body_request<H: CommandCustomHeader + TopicRequestHeaderTrait>(
339 &self,
340 addr: &CheetahString,
341 request: RpcRequest<H>,
342 timeout_millis: u64,
343 ) -> rocketmq_error::RocketMQResult<RpcResponse> {
344 let request_command = RpcClientUtils::create_command_for_rpc_request(request);
345 match self
346 .remoting_client
347 .invoke_request(Some(addr), request_command, timeout_millis)
348 .await
349 {
350 Ok(response) => match ResponseCode::from(response.code()) {
351 ResponseCode::Success => {
352 let body = response
353 .body()
354 .map(|value| Box::new(value.clone()) as Box<dyn Any>);
355 let rpc_response = RpcResponse::new_option(response.code(), body);
356 Ok(rpc_response)
357 }
358 _ => Ok(RpcResponse::new_exception(Some(RpcError(
359 response.code(),
360 "unknown remote error".to_string(),
361 )))),
362 },
363 Err(_error) => Err(RpcError(
364 From::from(ResponseCode::SystemError),
365 format!("process failed. addr: {addr}. Request"),
366 )
367 .into()),
368 }
369 }
370}
371
372impl RpcClient for RpcClientImpl {
373 async fn invoke<H: CommandCustomHeader + TopicRequestHeaderTrait>(
374 &self,
375 request: RpcRequest<H>,
376 timeout_millis: u64,
377 ) -> rocketmq_error::RocketMQResult<RpcResponse> {
378 if !self.client_hook_list.is_empty() {
379 for hook in self.client_hook_list.iter() {
380 let result = hook(Some(&request.header), None)?;
382 if let Some(result) = result {
383 return Ok(result);
384 }
385 }
386 }
387 let bname = request
388 .header
389 .broker_name()
390 .expect("broker name is required");
391 let addr = self.get_broker_addr_by_name_or_exception(bname.as_ref())?;
392 let result = match RequestCode::from(request.code) {
393 RequestCode::PullMessage => {
394 self.handle_pull_message(&addr, request, timeout_millis)
395 .await?
396 }
397 RequestCode::GetMinOffset => {
398 self.handle_get_min_offset(&addr, request, timeout_millis)
399 .await?
400 }
401 RequestCode::GetMaxOffset => {
402 self.handle_get_max_offset(&addr, request, timeout_millis)
403 .await?
404 }
405 RequestCode::SearchOffsetByTimestamp => {
406 self.handle_search_offset(&addr, request, timeout_millis)
407 .await?
408 }
409 RequestCode::GetEarliestMsgStoreTime => {
410 self.handle_get_earliest_msg_storetime(&addr, request, timeout_millis)
411 .await?
412 }
413 RequestCode::QueryConsumerOffset => {
414 self.handle_query_consumer_offset(&addr, request, timeout_millis)
415 .await?
416 }
417 RequestCode::UpdateConsumerOffset => {
418 self.handle_update_consumer_offset(&addr, request, timeout_millis)
419 .await?
420 }
421 RequestCode::GetTopicStatsInfo => {
422 self.handle_common_body_request(&addr, request, timeout_millis)
423 .await?
424 }
425 RequestCode::GetTopicConfig => {
426 self.handle_common_body_request(&addr, request, timeout_millis)
427 .await?
428 }
429 _ => {
430 return Err(RpcError(
431 From::from(ResponseCode::RequestCodeNotSupported),
432 format!("unknown request code {}", request.code),
433 )
434 .into())
435 }
436 };
437 Ok(result)
438 }
439
440 async fn invoke_mq<H: CommandCustomHeader + TopicRequestHeaderTrait>(
441 &self,
442 mq: MessageQueue,
443 mut request: RpcRequest<H>,
444 timeout_millis: u64,
445 ) -> rocketmq_error::RocketMQResult<RpcResponse> {
446 if let Some(broker_name) = self.client_metadata.get_broker_name_from_message_queue(&mq) {
447 request.header.set_broker_name(broker_name);
448 }
449 self.invoke(request, timeout_millis).await
450 }
451}