rocketmq_remoting/
clients.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17pub use blocking_client::BlockingClient;
18use cheetah_string::CheetahString;
19pub use client::Client;
20
21use crate::base::response_future::ResponseFuture;
22use crate::protocol::remoting_command::RemotingCommand;
23use crate::remoting::InvokeCallback;
24use crate::remoting::RemotingService;
25use crate::runtime::processor::RequestProcessor;
26
27mod async_client;
28mod blocking_client;
29
30mod client;
31pub mod rocketmq_default_impl;
32
33/// `RemotingClient` trait extends `RemotingService` to provide client-specific remote interaction
34/// functionalities.
35///
36/// This trait defines methods for managing name remoting_server addresses, invoking commands
37/// asynchronously or without expecting a response, checking if an address is reachable, and closing
38/// clients connected to specific addresses.
39#[allow(async_fn_in_trait)]
40pub trait RemotingClient: RemotingService {
41    /// Updates the list of name remoting_server addresses.
42    ///
43    /// # Arguments
44    /// * `addrs` - A list of name remoting_server addresses to update.
45    async fn update_name_server_address_list(&self, addrs: Vec<CheetahString>);
46
47    /// Retrieves the current list of name remoting_server addresses.
48    ///
49    /// # Returns
50    /// A vector containing the current list of name remoting_server addresses.
51    fn get_name_server_address_list(&self) -> &[CheetahString];
52
53    /// Retrieves a list of available name remoting_server addresses.
54    ///
55    /// # Returns
56    /// A vector containing the list of available name remoting_server addresses.
57    fn get_available_name_srv_list(&self) -> Vec<CheetahString>;
58
59    /// Asynchronously invokes a command on a specified address.
60    ///
61    /// # Arguments
62    /// * `addr` - The address to invoke the command on.
63    /// * `request` - The `RemotingCommand` to be sent.
64    /// * `timeout_millis` - The timeout for the operation in milliseconds.
65    ///
66    /// # Returns
67    /// A `Result` containing either the response `RemotingCommand` or an `Error`.
68    async fn invoke_async(
69        &self,
70        addr: Option<&CheetahString>,
71        request: RemotingCommand,
72        timeout_millis: u64,
73    ) -> rocketmq_error::RocketMQResult<RemotingCommand>;
74
75    /// Invokes a command on a specified address without waiting for a response.
76    ///
77    /// # Arguments
78    /// * `addr` - The address to invoke the command on.
79    /// * `request` - The `RemotingCommand` to be sent.
80    /// * `timeout_millis` - The timeout for the operation in milliseconds.
81    async fn invoke_oneway(
82        &self,
83        addr: &CheetahString,
84        request: RemotingCommand,
85        timeout_millis: u64,
86    );
87
88    /// Checks if a specified address is reachable.
89    ///
90    /// # Arguments
91    /// * `addr` - The address to check for reachability.
92    fn is_address_reachable(&mut self, addr: &CheetahString);
93
94    /// Closes clients connected to the specified addresses.
95    ///
96    /// # Arguments
97    /// * `addrs` - A list of addresses whose clients should be closed.
98    fn close_clients(&mut self, addrs: Vec<String>);
99
100    fn register_processor(&mut self, processor: impl RequestProcessor + Sync);
101}
102
103impl<T> InvokeCallback for T
104where
105    T: Fn(Option<RemotingCommand>, Option<Box<dyn std::error::Error>>, Option<ResponseFuture>)
106        + Send
107        + Sync,
108{
109    fn operation_complete(&self, response_future: ResponseFuture) {
110        self(None, None, Some(response_future))
111    }
112
113    fn operation_succeed(&self, response: RemotingCommand) {
114        self(Some(response), None, None)
115    }
116
117    fn operation_fail(&self, throwable: Box<dyn std::error::Error>) {
118        self(None, Some(throwable), None)
119    }
120}