rocketmq_remoting/clients.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17pub use blocking_client::BlockingClient;
18use cheetah_string::CheetahString;
19pub use client::Client;
20
21use crate::base::response_future::ResponseFuture;
22use crate::protocol::remoting_command::RemotingCommand;
23use crate::remoting::InvokeCallback;
24use crate::remoting::RemotingService;
25use crate::runtime::processor::RequestProcessor;
26
27mod async_client;
28mod blocking_client;
29
30mod client;
31pub mod rocketmq_default_impl;
32
33/// `RemotingClient` trait extends `RemotingService` to provide client-specific remote interaction
34/// functionalities.
35///
36/// This trait defines methods for managing name remoting_server addresses, invoking commands
37/// asynchronously or without expecting a response, checking if an address is reachable, and closing
38/// clients connected to specific addresses.
39#[allow(async_fn_in_trait)]
40pub trait RemotingClient: RemotingService {
41 /// Updates the list of name remoting_server addresses.
42 ///
43 /// # Arguments
44 /// * `addrs` - A list of name remoting_server addresses to update.
45 async fn update_name_server_address_list(&self, addrs: Vec<CheetahString>);
46
47 /// Retrieves the current list of name remoting_server addresses.
48 ///
49 /// # Returns
50 /// A vector containing the current list of name remoting_server addresses.
51 fn get_name_server_address_list(&self) -> &[CheetahString];
52
53 /// Retrieves a list of available name remoting_server addresses.
54 ///
55 /// # Returns
56 /// A vector containing the list of available name remoting_server addresses.
57 fn get_available_name_srv_list(&self) -> Vec<CheetahString>;
58
59 /// Asynchronously invokes a command on a specified address.
60 ///
61 /// # Arguments
62 /// * `addr` - The address to invoke the command on.
63 /// * `request` - The `RemotingCommand` to be sent.
64 /// * `timeout_millis` - The timeout for the operation in milliseconds.
65 ///
66 /// # Returns
67 /// A `Result` containing either the response `RemotingCommand` or an `Error`.
68 async fn invoke_async(
69 &self,
70 addr: Option<&CheetahString>,
71 request: RemotingCommand,
72 timeout_millis: u64,
73 ) -> rocketmq_error::RocketMQResult<RemotingCommand>;
74
75 /// Invokes a command on a specified address without waiting for a response.
76 ///
77 /// # Arguments
78 /// * `addr` - The address to invoke the command on.
79 /// * `request` - The `RemotingCommand` to be sent.
80 /// * `timeout_millis` - The timeout for the operation in milliseconds.
81 async fn invoke_oneway(
82 &self,
83 addr: &CheetahString,
84 request: RemotingCommand,
85 timeout_millis: u64,
86 );
87
88 /// Checks if a specified address is reachable.
89 ///
90 /// # Arguments
91 /// * `addr` - The address to check for reachability.
92 fn is_address_reachable(&mut self, addr: &CheetahString);
93
94 /// Closes clients connected to the specified addresses.
95 ///
96 /// # Arguments
97 /// * `addrs` - A list of addresses whose clients should be closed.
98 fn close_clients(&mut self, addrs: Vec<String>);
99
100 fn register_processor(&mut self, processor: impl RequestProcessor + Sync);
101}
102
103impl<T> InvokeCallback for T
104where
105 T: Fn(Option<RemotingCommand>, Option<Box<dyn std::error::Error>>, Option<ResponseFuture>)
106 + Send
107 + Sync,
108{
109 fn operation_complete(&self, response_future: ResponseFuture) {
110 self(None, None, Some(response_future))
111 }
112
113 fn operation_succeed(&self, response: RemotingCommand) {
114 self(Some(response), None, None)
115 }
116
117 fn operation_fail(&self, throwable: Box<dyn std::error::Error>) {
118 self(None, Some(throwable), None)
119 }
120}