rocketmq_remoting/clients/
client.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use rocketmq_error::RocketMQResult;
19use rocketmq_rust::ArcMut;
20use tokio::sync::broadcast;
21use tokio::sync::mpsc::Receiver;
22
23use crate::base::connection_net_event::ConnectionNetEvent;
24use crate::base::response_future::ResponseFuture;
25use crate::connection::Connection;
26// Import error helpers for convenient error creation
27use crate::error_helpers::{connection_invalid, io_error, remote_error};
28use crate::net::channel::Channel;
29use crate::net::channel::ChannelInner;
30use crate::protocol::remoting_command::RemotingCommand;
31use crate::remoting::inner::RemotingGeneralHandler;
32use crate::remoting_server::rocketmq_tokio_server::Shutdown;
33use crate::runtime::connection_handler_context::ConnectionHandlerContext;
34use crate::runtime::connection_handler_context::ConnectionHandlerContextWrapper;
35use crate::runtime::processor::RequestProcessor;
36
37#[derive(Clone)]
38pub struct Client<PR> {
39    /// The TCP connection decorated with the rocketmq remoting protocol encoder / decoder
40    /// implemented using a buffered `TcpStream`.
41    ///
42    /// When `Listener` receives an inbound connection, the `TcpStream` is
43    /// passed to `Connection::new`, which initializes the associated buffers.
44    /// `Connection` allows the handler to operate at the "frame" level and keep
45    /// the byte level protocol parsing details encapsulated in `Connection`.
46    //connection: Connection,
47    inner: ArcMut<ClientInner<PR>>,
48    notify_shutdown: broadcast::Sender<()>,
49    tx: tokio::sync::mpsc::Sender<SendMessage>,
50}
51
52type SendMessage = (
53    RemotingCommand,
54    Option<tokio::sync::oneshot::Sender<RocketMQResult<RemotingCommand>>>,
55    Option<u64>,
56);
57
58struct ClientInner<PR> {
59    cmd_handler: ArcMut<RemotingGeneralHandler<PR>>,
60    ctx: ConnectionHandlerContext,
61    shutdown: Shutdown,
62}
63
64impl<PR> ClientInner<PR>
65where
66    PR: RequestProcessor + Sync + 'static,
67{
68    pub async fn connect<T>(
69        addr: T,
70        cmd_handler: ArcMut<RemotingGeneralHandler<PR>>,
71        tx: Option<&tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
72        notify: broadcast::Receiver<()>,
73    ) -> RocketMQResult<(
74        tokio::sync::mpsc::Sender<SendMessage>,
75        ArcMut<ClientInner<PR>>,
76    )>
77    where
78        T: tokio::net::ToSocketAddrs,
79    {
80        let tcp_stream = tokio::net::TcpStream::connect(addr).await;
81        if tcp_stream.is_err() {
82            return Err(io_error(tcp_stream.err().unwrap()));
83        }
84        let stream = tcp_stream?;
85        let local_addr = stream.local_addr()?;
86        let remote_address = stream.peer_addr()?;
87        let connection = Connection::new(stream);
88        let channel_inner = ArcMut::new(ChannelInner::new(
89            connection,
90            cmd_handler.response_table.clone(),
91        ));
92        let channel = Channel::new(channel_inner, local_addr, remote_address);
93        let (tx_, rx) = tokio::sync::mpsc::channel(1024);
94        let client = ClientInner {
95            cmd_handler,
96            ctx: ArcMut::new(ConnectionHandlerContextWrapper::new(
97                //connection,
98                channel.clone(),
99            )),
100            shutdown: Shutdown::new(notify),
101        };
102        let client_inner = ArcMut::new(client);
103        let mut client_ = client_inner.clone();
104        tokio::spawn(async move {
105            let _ = client_.run_recv().await;
106        });
107        let mut client_ = client_inner.clone();
108        tokio::spawn(async move {
109            client_.run_send(rx).await;
110        });
111
112        if let Some(tx) = tx {
113            let _ = tx.send(ConnectionNetEvent::CONNECTED(
114                client_inner.ctx.channel.remote_address(),
115            ));
116        }
117        Ok((tx_, client_inner))
118    }
119
120    async fn run_recv(&mut self) -> RocketMQResult<()> {
121        loop {
122            //Get the next frame from the connection.
123            let channel = self.ctx.channel_mut();
124            let frame = tokio::select! {
125                res = channel.connection_mut().receive_command() => res,
126                _ = self.shutdown.recv() =>{
127                    //If a shutdown signal is received, mark connection as closed
128                    channel.connection_mut().close();
129                    return Ok(());
130                }
131            };
132            let cmd = match frame {
133                Some(frame) => frame?,
134                None => {
135                    //If the frame is None, it means the connection is closed.
136                    //Connection state is automatically managed by I/O operations
137                    return Ok(());
138                }
139            };
140            //process request and response
141            self.cmd_handler
142                .process_message_received(&mut self.ctx, cmd)
143                .await;
144        }
145    }
146
147    async fn run_send(&mut self, mut rx: Receiver<SendMessage>) {
148        while let Some((request, tx, timeout)) = rx.recv().await {
149            let _ = self.send(request, tx, timeout).await;
150        }
151    }
152
153    pub async fn send(
154        &mut self,
155        request: RemotingCommand,
156        tx: Option<tokio::sync::oneshot::Sender<RocketMQResult<RemotingCommand>>>,
157        timeout_millis: Option<u64>,
158    ) -> RocketMQResult<()> {
159        let opaque = request.opaque();
160        if let Some(tx) = tx {
161            self.cmd_handler.response_table.insert(
162                opaque,
163                ResponseFuture::new(opaque, timeout_millis.unwrap_or(0), true, tx),
164            );
165        }
166        match self.ctx.connection_mut().send_command(request).await {
167            Ok(_) => Ok(()),
168            Err(error) => {
169                // For I/O errors, mark connection as invalid
170                if matches!(error, rocketmq_error::RocketMQError::IO(_)) {
171                    self.cmd_handler.response_table.remove(&opaque);
172                    return Err(connection_invalid(error.to_string()));
173                }
174                // For other errors, just remove the response future
175                self.cmd_handler.response_table.remove(&opaque);
176                Err(error)
177            }
178        }
179    }
180}
181
182impl<PR> Client<PR>
183where
184    PR: RequestProcessor + Sync + 'static,
185{
186    /// Creates a new `Client` instance and connects to the specified address.
187    ///
188    /// # Arguments
189    ///
190    /// * `addr` - The address to connect to.
191    ///
192    /// # Returns
193    ///
194    /// A new `Client` instance wrapped in a `Result`. Returns an error if the connection fails.
195    pub(crate) async fn connect<T>(
196        addr: T,
197        cmd_handler: ArcMut<RemotingGeneralHandler<PR>>,
198        tx: Option<&tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
199    ) -> RocketMQResult<Client<PR>>
200    where
201        T: tokio::net::ToSocketAddrs,
202    {
203        let (notify_shutdown, _) = broadcast::channel(1);
204        let receiver = notify_shutdown.subscribe();
205        let (tx, inner) = ClientInner::connect(addr, cmd_handler, tx, receiver).await?;
206        Ok(Client {
207            inner,
208            notify_shutdown,
209            tx,
210        })
211    }
212
213    /// Invokes a remote operation with the given `RemotingCommand`.
214    ///
215    /// # Arguments
216    ///
217    /// * `request` - The `RemotingCommand` representing the request.
218    ///
219    /// # Returns
220    ///
221    /// The `RemotingCommand` representing the response, wrapped in a `Result`. Returns an error if
222    /// the invocation fails.
223    pub async fn send_read(
224        &mut self,
225        request: RemotingCommand,
226        timeout_millis: u64,
227    ) -> RocketMQResult<RemotingCommand> {
228        let (tx, rx) = tokio::sync::oneshot::channel::<RocketMQResult<RemotingCommand>>();
229
230        if let Err(err) = self
231            .tx
232            .send((request, Some(tx), Some(timeout_millis)))
233            .await
234        {
235            return Err(remote_error(err.to_string()));
236        }
237        match rx.await {
238            Ok(value) => value,
239            Err(error) => Err(remote_error(error.to_string())),
240        }
241    }
242
243    /// Invokes a remote operation with the given `RemotingCommand` and provides a callback function
244    /// for handling the response.
245    ///
246    /// # Arguments
247    ///
248    /// * `_request` - The `RemotingCommand` representing the request.
249    /// * `_func` - The callback function to handle the response.
250    ///
251    /// This method is a placeholder and currently does not perform any functionality.
252    pub async fn invoke_with_callback<F>(&self, _request: RemotingCommand, _func: F)
253    where
254        F: FnMut(),
255    {
256    }
257
258    /// Sends a request to the remote remoting_server.
259    ///
260    /// # Arguments
261    ///
262    /// * `request` - The `RemotingCommand` representing the request.
263    ///
264    /// # Returns
265    ///
266    /// A `Result` indicating success or failure in sending the request.
267    pub async fn send(&mut self, request: RemotingCommand) -> RocketMQResult<()> {
268        if let Err(err) = self.tx.send((request, None, None)).await {
269            return Err(remote_error(err.to_string()));
270        }
271        Ok(())
272    }
273
274    /// Sends multiple requests in a batch (fire-and-forget, no response expected).
275    ///
276    /// # Performance
277    ///
278    /// Batching provides 2-4x throughput improvement for small messages:
279    /// - Single system call instead of N
280    /// - Better CPU cache locality during encoding
281    /// - Reduced Nagle algorithm delays
282    ///
283    /// # Use Cases
284    ///
285    /// - Log shipping (async, high volume)
286    /// - Metrics reporting
287    /// - Event publishing
288    ///
289    /// # Arguments
290    ///
291    /// * `requests` - Vector of commands to send (consumed)
292    ///
293    /// # Returns
294    ///
295    /// - `Ok(())`: All commands queued successfully
296    /// - `Err(e)`: Channel send error (client shutdown)
297    ///
298    /// # Example
299    ///
300    /// ```rust,ignore
301    /// let commands = vec![
302    ///     RemotingCommand::create_request_command(/*...*/),
303    ///     RemotingCommand::create_request_command(/*...*/),
304    /// ];
305    /// client.send_batch(commands).await?;
306    /// ```
307    pub async fn send_batch(&mut self, requests: Vec<RemotingCommand>) -> RocketMQResult<()> {
308        // Send all commands individually through the channel
309        // The underlying connection will buffer them efficiently
310        for request in requests {
311            if let Err(err) = self.tx.send((request, None, None)).await {
312                return Err(remote_error(err.to_string()));
313            }
314        }
315        Ok(())
316    }
317
318    /// Sends multiple requests and collects responses (request-response batch).
319    ///
320    /// # Performance vs send_read()
321    ///
322    /// ```text
323    /// 100x send_read():    ~5000ms  (sequential network RTT)
324    /// send_batch_read():   ~100ms   (parallel + single RTT)
325    /// Improvement: 50x faster
326    /// ```
327    ///
328    /// # Arguments
329    ///
330    /// * `requests` - Vector of commands expecting responses
331    /// * `timeout_millis` - Timeout for each individual request
332    ///
333    /// # Returns
334    ///
335    /// Vector of results in the same order as input requests
336    ///
337    /// # Example
338    ///
339    /// ```rust,ignore
340    /// let requests = vec![cmd1, cmd2, cmd3];
341    /// let responses = client.send_batch_read(requests, 3000).await?;
342    /// for response in responses {
343    ///     match response {
344    ///         Ok(cmd) => println!("Success: {:?}", cmd),
345    ///         Err(e) => eprintln!("Failed: {}", e),
346    ///     }
347    /// }
348    /// ```
349    pub async fn send_batch_read(
350        &mut self,
351        requests: Vec<RemotingCommand>,
352        timeout_millis: u64,
353    ) -> RocketMQResult<Vec<RocketMQResult<RemotingCommand>>> {
354        let mut receivers = Vec::with_capacity(requests.len());
355
356        // Send all requests and collect oneshot receivers
357        for request in requests {
358            let (tx, rx) = tokio::sync::oneshot::channel::<RocketMQResult<RemotingCommand>>();
359
360            if let Err(err) = self
361                .tx
362                .send((request, Some(tx), Some(timeout_millis)))
363                .await
364            {
365                return Err(remote_error(err.to_string()));
366            }
367
368            receivers.push(rx);
369        }
370
371        // Collect all responses
372        let mut results = Vec::with_capacity(receivers.len());
373        for rx in receivers {
374            let result = match rx.await {
375                Ok(value) => value,
376                Err(error) => Err(remote_error(error.to_string())),
377            };
378            results.push(result);
379        }
380
381        Ok(results)
382    }
383
384    /// Reads and retrieves the response from the remote remoting_server.
385    ///
386    /// # Returns
387    ///
388    /// The `RemotingCommand` representing the response, wrapped in a `Result`. Returns an error if
389    /// reading the response fails.
390    async fn read(&mut self) -> RocketMQResult<RemotingCommand> {
391        /*match self.inner.channel.0.connection.receive_command().await {
392            None => {
393                // Connection state is automatically managed by receive_command()
394                Err(ConnectionInvalid("connection disconnection".to_string()))
395            }
396            Some(result) => match result {
397                Ok(response) => Ok(response),
398                Err(error) => match error {
399                    Io(value) => {
400                        // Connection state is automatically marked degraded by I/O operations
401                        Err(ConnectionInvalid(value.to_string()))
402                    }
403                    _ => Err(error),
404                },
405            },
406        }*/
407        unimplemented!("read unimplemented")
408    }
409
410    pub fn connection(&self) -> &Connection {
411        self.inner.ctx.connection_ref()
412    }
413
414    pub fn connection_mut(&mut self) -> &mut Connection {
415        self.inner.ctx.connection_mut()
416    }
417}