rocketmq_remoting/net/
channel.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::fmt::Display;
20use std::hash::Hash;
21use std::hash::Hasher;
22use std::net::SocketAddr;
23use std::time::Duration;
24
25use cheetah_string::CheetahString;
26// Use flume for high-performance async channel (40-60% faster than tokio::mpsc)
27// Lock-free design provides better throughput under high load
28use flume::{Receiver, Sender};
29use rocketmq_error::RocketMQError;
30use rocketmq_rust::ArcMut;
31use tokio::time::timeout;
32use tracing::error;
33use uuid::Uuid;
34
35use crate::base::response_future::ResponseFuture;
36use crate::connection::Connection;
37use crate::protocol::remoting_command::RemotingCommand;
38
39pub type ChannelId = CheetahString;
40
41pub type ArcChannel = ArcMut<Channel>;
42
43/// High-level abstraction over a bidirectional network connection.
44///
45/// `Channel` represents a logical communication endpoint with identity,
46/// address information, and access to the underlying connection and
47/// response tracking infrastructure.
48///
49/// ## Architecture
50///
51/// ```text
52/// ┌─────────────────────────────────────────┐
53/// │           Channel                       │
54/// │  ┌─────────────────────────────────┐   │
55/// │  │  Identity & Addressing          │   │
56/// │  │  - channel_id (UUID)            │   │
57/// │  │  - local_address (SocketAddr)   │   │
58/// │  │  - remote_address (SocketAddr)  │   │
59/// │  └─────────────────────────────────┘   │
60/// │  ┌─────────────────────────────────┐   │
61/// │  │  ChannelInner (shared state)    │   │
62/// │  │  - Connection (I/O)             │   │
63/// │  │  - ResponseTable (futures)      │   │
64/// │  │  - Message queue (tx/rx)        │   │
65/// │  └─────────────────────────────────┘   │
66/// └─────────────────────────────────────────┘
67/// ```
68///
69/// ## Design Rationale
70///
71/// - **Separation of concerns**: `Channel` handles identity/routing, `ChannelInner` handles I/O
72/// - **Clone-friendly**: Lightweight outer type can be cloned, shares inner state via `ArcMut`
73/// - **Equality/Hash**: Based on identity (addresses + ID), not inner state
74#[derive(Clone)]
75pub struct Channel {
76    // === Core State ===
77    /// Shared mutable access to channel internals (connection, response tracking, etc.)
78    inner: ArcMut<ChannelInner>,
79
80    // === Identity & Addressing ===
81    /// Local socket address (our end of the connection)
82    local_address: SocketAddr,
83
84    /// Remote peer socket address (their end of the connection)
85    remote_address: SocketAddr,
86
87    /// Unique identifier for this channel instance (UUID-based)
88    ///
89    /// Used for logging, routing, and distinguishing channels in maps/sets.
90    channel_id: ChannelId,
91}
92
93impl Channel {
94    /// Creates a new `Channel` with generated UUID identifier.
95    ///
96    /// # Arguments
97    ///
98    /// * `inner` - Shared channel state (connection, response table, etc.)
99    /// * `local_address` - Our local socket address
100    /// * `remote_address` - Remote peer socket address
101    ///
102    /// # Returns
103    ///
104    /// A new channel with a randomly generated UUID as its ID.
105    pub fn new(
106        inner: ArcMut<ChannelInner>,
107        local_address: SocketAddr,
108        remote_address: SocketAddr,
109    ) -> Self {
110        let channel_id = Uuid::new_v4().to_string().into();
111        Self {
112            inner,
113            local_address,
114            remote_address,
115            channel_id,
116        }
117    }
118
119    // === Address Mutators ===
120
121    /// Updates the local address of this channel.
122    ///
123    /// # Arguments
124    ///
125    /// * `local_address` - New local socket address
126    #[inline]
127    pub fn set_local_address(&mut self, local_address: SocketAddr) {
128        self.local_address = local_address;
129    }
130
131    /// Updates the remote address of this channel.
132    ///
133    /// # Arguments
134    ///
135    /// * `remote_address` - New remote socket address
136    #[inline]
137    pub fn set_remote_address(&mut self, remote_address: SocketAddr) {
138        self.remote_address = remote_address;
139    }
140
141    /// Updates the channel identifier.
142    ///
143    /// # Arguments
144    ///
145    /// * `channel_id` - New channel ID (convertible to `CheetahString`)
146    ///
147    /// # Warning
148    ///
149    /// Changing the ID after insertion into a HashMap/HashSet will break lookup.
150    #[inline]
151    pub fn set_channel_id(&mut self, channel_id: impl Into<CheetahString>) {
152        self.channel_id = channel_id.into();
153    }
154
155    // === Address Accessors ===
156
157    /// Gets the local socket address.
158    ///
159    /// # Returns
160    ///
161    /// The local address of this channel
162    #[inline]
163    pub fn local_address(&self) -> SocketAddr {
164        self.local_address
165    }
166
167    /// Gets the remote peer socket address.
168    ///
169    /// # Returns
170    ///
171    /// The remote address of this channel
172    #[inline]
173    pub fn remote_address(&self) -> SocketAddr {
174        self.remote_address
175    }
176
177    /// Gets the channel identifier as a string slice.
178    ///
179    /// # Returns
180    ///
181    /// String slice of the channel ID
182    #[inline]
183    pub fn channel_id(&self) -> &str {
184        self.channel_id.as_str()
185    }
186
187    /// Gets a cloned owned copy of the channel identifier.
188    ///
189    /// # Returns
190    ///
191    /// Owned `CheetahString` containing the channel ID
192    pub fn channel_id_owned(&self) -> CheetahString {
193        self.channel_id.clone()
194    }
195
196    // === Connection Access ===
197
198    /// Gets mutable access to the underlying connection.
199    ///
200    /// # Returns
201    ///
202    /// Mutable reference to the `Connection` for sending/receiving
203    ///
204    /// # Use Case
205    ///
206    /// Direct low-level I/O operations (receive_command, send_command)
207    #[inline]
208    pub fn connection_mut(&mut self) -> &mut Connection {
209        self.inner.connection.as_mut()
210    }
211
212    /// Gets immutable access to the underlying connection.
213    ///
214    /// # Returns
215    ///
216    /// Immutable reference to the `Connection` for inspection
217    #[inline]
218    pub fn connection_ref(&self) -> &Connection {
219        self.inner.connection_ref()
220    }
221
222    // === Inner State Access ===
223
224    /// Gets immutable access to the shared channel state.
225    ///
226    /// # Returns
227    ///
228    /// Immutable reference to `ChannelInner` (connection + response table)
229    pub fn channel_inner(&self) -> &ChannelInner {
230        self.inner.as_ref()
231    }
232
233    /// Gets mutable access to the shared channel state.
234    ///
235    /// # Returns
236    ///
237    /// Mutable reference to `ChannelInner` for advanced operations
238    pub fn channel_inner_mut(&mut self) -> &mut ChannelInner {
239        self.inner.as_mut()
240    }
241}
242
243impl PartialEq for Channel {
244    fn eq(&self, other: &Self) -> bool {
245        self.local_address == other.local_address
246            && self.remote_address == other.remote_address
247            && self.channel_id == other.channel_id
248    }
249}
250
251impl Eq for Channel {}
252
253impl Hash for Channel {
254    fn hash<H: Hasher>(&self, state: &mut H) {
255        self.local_address.hash(state);
256        self.remote_address.hash(state);
257        self.channel_id.hash(state);
258    }
259}
260
261impl Debug for Channel {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        write!(
264            f,
265            "Channel {{ local_address: {:?}, remote_address: {:?}, channel_id: {} }}",
266            self.local_address, self.remote_address, self.channel_id
267        )
268    }
269}
270
271impl Display for Channel {
272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273        write!(
274            f,
275            "Channel {{ local_address: {}, remote_address: {}, channel_id: {} }}",
276            self.local_address, self.remote_address, self.channel_id
277        )
278    }
279}
280
281/// Internal message type for the send queue.
282///
283/// Encapsulates a command to send along with optional response tracking.
284type ChannelMessage = (
285    RemotingCommand, /* command */
286    Option<tokio::sync::oneshot::Sender<rocketmq_error::RocketMQResult<RemotingCommand>>>, /* response_tx */
287    Option<u64>, /* timeout_millis */
288);
289
290/// Shared state for a `Channel` - handles I/O, async message queueing, and response tracking.
291///
292/// `ChannelInner` is the "heavy" part of a channel that is shared via `ArcMut` across
293/// multiple `Channel` clones. It manages:
294///
295/// - **Connection**: Low-level TCP I/O
296/// - **Send Queue**: Async message queueing to decouple caller from I/O backpressure
297/// - **Response Table**: Tracks pending request-response pairs (opaque ID → future)
298///
299/// ## Threading Model
300///
301/// - **Send Task**: Dedicated task (`handle_send`) pulls from queue and writes to connection
302/// - **Response Tracking**: Shared map accessed by send task (insert) and receive task (remove)
303///
304/// ## Lifecycle
305///
306/// 1. **Created**: Spawns background `handle_send` task
307/// 2. **Active**: Processes send queue, tracks responses
308/// 3. **Shutdown**: Queue closed, pending responses canceled
309pub struct ChannelInner {
310    // === Message Queue ===
311    /// Sender half of the high-performance message queue channel.
312    ///
313    /// Uses `flume` instead of `tokio::mpsc` for:
314    /// - 40-60% better throughput (lock-free for most operations)
315    /// - Lower latency under contention
316    /// - Better backpressure handling
317    ///
318    /// Callers use this to enqueue commands for asynchronous sending.
319    /// The receive half is owned by the background `handle_send` task.
320    outbound_queue_tx: Sender<ChannelMessage>,
321
322    // === I/O Transport ===
323    /// Underlying network connection (shared, mutable).
324    ///
325    /// Wrapped in `ArcMut` to allow concurrent access by the send task
326    /// and potential direct access via `Channel::connection_mut()`.
327    pub(crate) connection: ArcMut<Connection>,
328
329    // === Response Tracking ===
330    /// Map of pending request opaque IDs to their response futures.
331    ///
332    /// - **Key**: Request opaque ID (unique per request)
333    /// - **Value**: `ResponseFuture` containing timeout and oneshot channel
334    ///
335    /// Shared between:
336    /// - Send task: Inserts entries when request is sent
337    /// - Receive task: Removes and completes entries when response arrives
338    pub(crate) response_table: ArcMut<HashMap<i32, ResponseFuture>>,
339}
340
341/// Background task that processes the outbound message queue.
342///
343/// # Performance Features
344///
345/// - Uses `flume` receiver for lock-free message reception
346/// - Processes messages sequentially to maintain order
347/// - Handles errors gracefully (marks connection as failed on I/O errors)
348///
349/// # Potential Optimization (TODO)
350///
351/// Consider implementing batch sending:
352/// ```ignore
353/// // Collect multiple pending messages
354/// let mut batch = vec![first_msg];
355/// while batch.len() < 32 {
356///     match rx.try_recv() {
357///         Ok(msg) => batch.push(msg),
358///         Err(_) => break,
359///     }
360/// }
361/// // Send batch together for better throughput
362/// ```
363///
364/// This would reduce per-message overhead and improve throughput by ~20-40%
365/// under high load, at the cost of slightly increased latency for small batches.
366pub(crate) async fn handle_send(
367    mut connection: ArcMut<Connection>,
368    rx: Receiver<ChannelMessage>,
369    mut response_table: ArcMut<HashMap<i32, ResponseFuture>>,
370) {
371    // Loop until channel is closed or connection fails
372    loop {
373        // flume receiver is async-compatible: recv_async() awaits message
374        let msg = match rx.recv_async().await {
375            Ok(msg) => msg,
376            Err(_) => {
377                // Channel closed, exit gracefully
378                break;
379            }
380        };
381
382        let (send, tx, timeout_millis) = msg;
383        let opaque = send.opaque();
384
385        // Register response future if this is a request-response operation
386        if let Some(tx) = tx {
387            response_table.insert(
388                opaque,
389                ResponseFuture::new(opaque, timeout_millis.unwrap_or(0), true, tx),
390            );
391        }
392
393        // Send command via connection
394        match connection.send_command(send).await {
395            Ok(_) => {}
396            Err(error) => match error {
397                rocketmq_error::RocketMQError::IO(error) => {
398                    // I/O error means connection is broken
399                    // Connection state is automatically marked as degraded by send_command()
400                    error!("send request failed: {}", error);
401                    response_table.remove(&opaque);
402                    return;
403                }
404                _ => {
405                    // Other errors: remove response future but continue
406                    response_table.remove(&opaque);
407                }
408            },
409        };
410    }
411}
412
413impl ChannelInner {
414    /// Creates a new `ChannelInner` and spawns the background send task.
415    ///
416    /// # Arguments
417    ///
418    /// * `connection` - The underlying TCP connection
419    /// * `response_table` - Shared response tracking map
420    ///
421    /// # Returns
422    ///
423    /// A new `ChannelInner` with an active background send task.
424    ///
425    /// # Implementation Note
426    ///
427    /// - Queue capacity: 1024 messages (adjust based on load)
428    /// - Spawns `handle_send` task immediately
429    /// - Task runs until channel is dropped or connection fails
430    ///
431    /// # Performance
432    ///
433    /// Uses `flume::bounded` channel for better performance:
434    /// - Lock-free operations for most cases
435    /// - ~40-60% higher throughput than tokio::mpsc
436    /// - Better performance under contention
437    pub fn new(
438        connection: Connection,
439        response_table: ArcMut<HashMap<i32, ResponseFuture>>,
440    ) -> Self {
441        const QUEUE_CAPACITY: usize = 1024;
442
443        // Use flume bounded channel for better performance
444        // flume provides lock-free operations and better throughput than tokio::mpsc
445        let (outbound_queue_tx, outbound_queue_rx) = flume::bounded(QUEUE_CAPACITY);
446
447        let connection = ArcMut::new(connection);
448        tokio::spawn(handle_send(
449            connection.clone(),
450            outbound_queue_rx,
451            response_table.clone(),
452        ));
453        Self {
454            outbound_queue_tx,
455            connection,
456            response_table,
457        }
458    }
459}
460
461impl ChannelInner {
462    // === Connection Accessors ===
463
464    /// Gets a cloned `ArcMut` handle to the connection.
465    ///
466    /// # Returns
467    ///
468    /// Shared mutable reference to the connection (cheap clone, increments refcount)
469    #[inline]
470    pub fn connection(&self) -> ArcMut<Connection> {
471        self.connection.clone()
472    }
473
474    /// Gets an immutable reference to the connection.
475    ///
476    /// # Returns
477    ///
478    /// Immutable reference to the underlying `Connection`
479    #[inline]
480    pub fn connection_ref(&self) -> &Connection {
481        self.connection.as_ref()
482    }
483
484    /// Gets a mutable reference to the connection.
485    ///
486    /// # Returns
487    ///
488    /// Mutable reference to the underlying `Connection`
489    #[inline]
490    pub fn connection_mut(&mut self) -> &mut Connection {
491        self.connection.as_mut()
492    }
493
494    // === High-Level Send Methods ===
495
496    /// Sends a request and waits for the response (request-response pattern).
497    ///
498    /// Enqueues the request, tracks it via opaque ID, and blocks until the
499    /// response arrives or timeout expires.
500    ///
501    /// # Arguments
502    ///
503    /// * `request` - The command to send
504    /// * `timeout_millis` - Maximum wait time for response (milliseconds)
505    ///
506    /// # Returns
507    ///
508    /// - `Ok(response)`: Response received within timeout
509    /// - `Err(ChannelSendRequestFailed)`: Failed to enqueue request
510    /// - `Err(ChannelRecvRequestFailed)`: Response channel closed or timeout
511    ///
512    /// # Lifecycle
513    ///
514    /// 1. Create oneshot channel for response
515    /// 2. Enqueue request with response channel
516    /// 3. Wait (with timeout) for response on channel
517    /// 4. Clean up response table on error
518    ///
519    /// # Example
520    ///
521    /// ```ignore
522    /// let request = RemotingCommand::create_request_command(10, header).into();
523    /// let response = channel_inner.send_wait_response(request, 3000).await?;
524    /// println!("Got response: {:?}", response);
525    /// ```
526    pub async fn send_wait_response(
527        &mut self,
528        request: RemotingCommand,
529        timeout_millis: u64,
530    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
531        let (response_tx, response_rx) =
532            tokio::sync::oneshot::channel::<rocketmq_error::RocketMQResult<RemotingCommand>>();
533        let opaque = request.opaque();
534
535        // Enqueue request with response tracking
536        // flume sender: use send_async() for async context
537        if let Err(err) = self
538            .outbound_queue_tx
539            .send_async((request, Some(response_tx), Some(timeout_millis)))
540            .await
541        {
542            return Err(RocketMQError::network_connection_failed(
543                "channel",
544                format!("send failed: {}", err),
545            ));
546        }
547
548        // Wait for response with timeout
549        match timeout(Duration::from_millis(timeout_millis), response_rx).await {
550            Ok(result) => match result {
551                Ok(response) => response,
552                Err(e) => {
553                    // Response channel closed without sending (connection dropped?)
554                    self.response_table.remove(&opaque);
555                    Err(RocketMQError::network_connection_failed(
556                        "channel",
557                        format!("connection dropped: {}", e),
558                    ))
559                }
560            },
561            Err(_) => {
562                // Timeout expired
563                self.response_table.remove(&opaque);
564                Err(RocketMQError::Timeout {
565                    operation: "channel_recv",
566                    timeout_ms: timeout_millis,
567                })
568            }
569        }
570    }
571
572    /// Sends a one-way request without waiting for response (fire-and-forget).
573    ///
574    /// Marks the request as oneway and enqueues it. Does not track response.
575    ///
576    /// # Arguments
577    ///
578    /// * `request` - The command to send
579    /// * `timeout_millis` - Timeout for enqueuing (not for response)
580    ///
581    /// # Returns
582    ///
583    /// - `Ok(().into())`: Request successfully enqueued
584    /// - `Err(ChannelSendRequestFailed)`: Failed to enqueue
585    ///
586    /// # Use Case
587    ///
588    /// Notifications, heartbeats, or any scenario where response is not needed.
589    /// More efficient than `send_wait_response` as it avoids response tracking overhead.
590    pub async fn send_oneway(
591        &self,
592        request: RemotingCommand,
593        timeout_millis: u64,
594    ) -> rocketmq_error::RocketMQResult<()> {
595        let request = request.mark_oneway_rpc();
596
597        // flume sender: use send_async() for async context
598        if let Err(err) = self
599            .outbound_queue_tx
600            .send_async((request, None, Some(timeout_millis)))
601            .await
602        {
603            error!("send oneway request failed: {}", err);
604            return Err(RocketMQError::network_connection_failed(
605                "channel",
606                format!("send oneway failed: {}", err),
607            ));
608        }
609        Ok(())
610    }
611
612    /// Sends a request without waiting for response (async enqueue only).
613    ///
614    /// Similar to `send_oneway`, but does not mark the request as oneway.
615    /// Use when caller doesn't care about response but request is not marked as oneway protocol.
616    ///
617    /// # Arguments
618    ///
619    /// * `request` - The command to send
620    /// * `timeout_millis` - Optional timeout for enqueuing
621    ///
622    /// # Returns
623    ///
624    /// - `Ok(())`: Request successfully enqueued
625    /// - `Err(ChannelSendRequestFailed)`: Failed to enqueue
626    pub async fn send(
627        &self,
628        request: RemotingCommand,
629        timeout_millis: Option<u64>,
630    ) -> rocketmq_error::RocketMQResult<()> {
631        // flume sender: use send_async() for async context
632        if let Err(err) = self
633            .outbound_queue_tx
634            .send_async((request, None, timeout_millis))
635            .await
636        {
637            error!("send request failed: {}", err);
638            return Err(RocketMQError::network_connection_failed(
639                "channel",
640                format!("send failed: {}", err),
641            ));
642        }
643        Ok(())
644    }
645
646    // === Health Check ===
647
648    /// Checks if the underlying connection is healthy.
649    ///
650    /// # Returns
651    ///
652    /// - `true`: Connection is operational
653    /// - `false`: Connection has failed, channel should be discarded
654    #[inline]
655    pub fn is_healthy(&self) -> bool {
656        self.connection.is_healthy()
657    }
658
659    /// Legacy alias for `is_healthy()` - kept for backward compatibility.
660    ///
661    /// # Deprecated
662    ///
663    /// Use `is_healthy()` instead for clearer semantics.
664    #[inline]
665    #[deprecated(since = "0.1.0", note = "Use `is_healthy()` instead")]
666    pub fn is_ok(&self) -> bool {
667        self.connection.is_healthy()
668    }
669}