rocketmq_remoting/clients.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17pub use blocking_client::BlockingClient;
18use cheetah_string::CheetahString;
19pub use client::Client;
20
21use crate::base::response_future::ResponseFuture;
22use crate::protocol::remoting_command::RemotingCommand;
23use crate::remoting::InvokeCallback;
24use crate::remoting::RemotingService;
25use crate::runtime::processor::RequestProcessor;
26
27mod async_client;
28mod blocking_client;
29
30mod client;
31pub mod connection_pool;
32pub(crate) mod nameserver_selector;
33pub mod reconnect;
34pub mod rocketmq_tokio_client;
35
36/// `RemotingClient` trait extends `RemotingService` to provide client-specific remote interaction
37/// functionalities.
38///
39/// This trait defines methods for managing name remoting_server addresses, invoking commands
40/// asynchronously or without expecting a response, checking if an address is reachable, and closing
41/// clients connected to specific addresses.
42#[allow(async_fn_in_trait)]
43pub trait RemotingClient: RemotingService {
44 /// Updates the list of name remoting_server addresses.
45 ///
46 /// # Arguments
47 /// * `addrs` - A list of name remoting_server addresses to update.
48 async fn update_name_server_address_list(&self, addrs: Vec<CheetahString>);
49
50 /// Retrieves the current list of name remoting_server addresses.
51 ///
52 /// # Returns
53 /// A vector containing the current list of name remoting_server addresses.
54 fn get_name_server_address_list(&self) -> &[CheetahString];
55
56 /// Retrieves a list of available name remoting_server addresses.
57 ///
58 /// # Returns
59 /// A vector containing the list of available name remoting_server addresses.
60 fn get_available_name_srv_list(&self) -> Vec<CheetahString>;
61
62 /// Asynchronously invokes a command on a specified address.
63 ///
64 /// # Arguments
65 /// * `addr` - The address to invoke the command on.
66 /// * `request` - The `RemotingCommand` to be sent.
67 /// * `timeout_millis` - The timeout for the operation in milliseconds.
68 ///
69 /// # Returns
70 /// A `Result` containing either the response `RemotingCommand` or an `Error`.
71 async fn invoke_request(
72 &self,
73 addr: Option<&CheetahString>,
74 request: RemotingCommand,
75 timeout_millis: u64,
76 ) -> rocketmq_error::RocketMQResult<RemotingCommand>;
77
78 /// Invokes a command on a specified address without waiting for a response.
79 ///
80 /// # Arguments
81 /// * `addr` - The address to invoke the command on.
82 /// * `request` - The `RemotingCommand` to be sent.
83 /// * `timeout_millis` - The timeout for the operation in milliseconds.
84 async fn invoke_request_oneway(
85 &self,
86 addr: &CheetahString,
87 request: RemotingCommand,
88 timeout_millis: u64,
89 );
90
91 /// Checks if a specified address is reachable.
92 ///
93 /// # Arguments
94 /// * `addr` - The address to check for reachability.
95 fn is_address_reachable(&mut self, addr: &CheetahString);
96
97 /// Closes clients connected to the specified addresses.
98 ///
99 /// # Arguments
100 /// * `addrs` - A list of addresses whose clients should be closed.
101 fn close_clients(&mut self, addrs: Vec<String>);
102
103 fn register_processor(&mut self, processor: impl RequestProcessor + Sync);
104}
105
106impl<T> InvokeCallback for T
107where
108 T: Fn(Option<RemotingCommand>, Option<Box<dyn std::error::Error>>, Option<ResponseFuture>)
109 + Send
110 + Sync,
111{
112 fn operation_complete(&self, response_future: ResponseFuture) {
113 self(None, None, Some(response_future))
114 }
115
116 fn operation_succeed(&self, response: RemotingCommand) {
117 self(Some(response), None, None)
118 }
119
120 fn operation_fail(&self, throwable: Box<dyn std::error::Error>) {
121 self(None, Some(throwable), None)
122 }
123}