rocketmq_remoting/net/channel.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::fmt::Display;
20use std::hash::Hash;
21use std::hash::Hasher;
22use std::net::SocketAddr;
23use std::time::Duration;
24
25use cheetah_string::CheetahString;
26// Use flume for high-performance async channel (40-60% faster than tokio::mpsc)
27// Lock-free design provides better throughput under high load
28use flume::{Receiver, Sender};
29use rocketmq_error::RocketMQError;
30use rocketmq_rust::ArcMut;
31use tokio::time::timeout;
32use tracing::error;
33use uuid::Uuid;
34
35use crate::base::response_future::ResponseFuture;
36use crate::connection::Connection;
37use crate::protocol::remoting_command::RemotingCommand;
38
39pub type ChannelId = CheetahString;
40
41pub type ArcChannel = ArcMut<Channel>;
42
43/// High-level abstraction over a bidirectional network connection.
44///
45/// `Channel` represents a logical communication endpoint with identity,
46/// address information, and access to the underlying connection and
47/// response tracking infrastructure.
48///
49/// ## Architecture
50///
51/// ```text
52/// ┌─────────────────────────────────────────┐
53/// │ Channel │
54/// │ ┌─────────────────────────────────┐ │
55/// │ │ Identity & Addressing │ │
56/// │ │ - channel_id (UUID) │ │
57/// │ │ - local_address (SocketAddr) │ │
58/// │ │ - remote_address (SocketAddr) │ │
59/// │ └─────────────────────────────────┘ │
60/// │ ┌─────────────────────────────────┐ │
61/// │ │ ChannelInner (shared state) │ │
62/// │ │ - Connection (I/O) │ │
63/// │ │ - ResponseTable (futures) │ │
64/// │ │ - Message queue (tx/rx) │ │
65/// │ └─────────────────────────────────┘ │
66/// └─────────────────────────────────────────┘
67/// ```
68///
69/// ## Design Rationale
70///
71/// - **Separation of concerns**: `Channel` handles identity/routing, `ChannelInner` handles I/O
72/// - **Clone-friendly**: Lightweight outer type can be cloned, shares inner state via `ArcMut`
73/// - **Equality/Hash**: Based on identity (addresses + ID), not inner state
74#[derive(Clone)]
75pub struct Channel {
76 // === Core State ===
77 /// Shared mutable access to channel internals (connection, response tracking, etc.)
78 inner: ArcMut<ChannelInner>,
79
80 // === Identity & Addressing ===
81 /// Local socket address (our end of the connection)
82 local_address: SocketAddr,
83
84 /// Remote peer socket address (their end of the connection)
85 remote_address: SocketAddr,
86
87 /// Unique identifier for this channel instance (UUID-based)
88 ///
89 /// Used for logging, routing, and distinguishing channels in maps/sets.
90 channel_id: ChannelId,
91}
92
93impl Channel {
94 /// Creates a new `Channel` with generated UUID identifier.
95 ///
96 /// # Arguments
97 ///
98 /// * `inner` - Shared channel state (connection, response table, etc.)
99 /// * `local_address` - Our local socket address
100 /// * `remote_address` - Remote peer socket address
101 ///
102 /// # Returns
103 ///
104 /// A new channel with a randomly generated UUID as its ID.
105 pub fn new(
106 inner: ArcMut<ChannelInner>,
107 local_address: SocketAddr,
108 remote_address: SocketAddr,
109 ) -> Self {
110 let channel_id = Uuid::new_v4().to_string().into();
111 Self {
112 inner,
113 local_address,
114 remote_address,
115 channel_id,
116 }
117 }
118
119 // === Address Mutators ===
120
121 /// Updates the local address of this channel.
122 ///
123 /// # Arguments
124 ///
125 /// * `local_address` - New local socket address
126 #[inline]
127 pub fn set_local_address(&mut self, local_address: SocketAddr) {
128 self.local_address = local_address;
129 }
130
131 /// Updates the remote address of this channel.
132 ///
133 /// # Arguments
134 ///
135 /// * `remote_address` - New remote socket address
136 #[inline]
137 pub fn set_remote_address(&mut self, remote_address: SocketAddr) {
138 self.remote_address = remote_address;
139 }
140
141 /// Updates the channel identifier.
142 ///
143 /// # Arguments
144 ///
145 /// * `channel_id` - New channel ID (convertible to `CheetahString`)
146 ///
147 /// # Warning
148 ///
149 /// Changing the ID after insertion into a HashMap/HashSet will break lookup.
150 #[inline]
151 pub fn set_channel_id(&mut self, channel_id: impl Into<CheetahString>) {
152 self.channel_id = channel_id.into();
153 }
154
155 // === Address Accessors ===
156
157 /// Gets the local socket address.
158 ///
159 /// # Returns
160 ///
161 /// The local address of this channel
162 #[inline]
163 pub fn local_address(&self) -> SocketAddr {
164 self.local_address
165 }
166
167 /// Gets the remote peer socket address.
168 ///
169 /// # Returns
170 ///
171 /// The remote address of this channel
172 #[inline]
173 pub fn remote_address(&self) -> SocketAddr {
174 self.remote_address
175 }
176
177 /// Gets the channel identifier as a string slice.
178 ///
179 /// # Returns
180 ///
181 /// String slice of the channel ID
182 #[inline]
183 pub fn channel_id(&self) -> &str {
184 self.channel_id.as_str()
185 }
186
187 /// Gets a cloned owned copy of the channel identifier.
188 ///
189 /// # Returns
190 ///
191 /// Owned `CheetahString` containing the channel ID
192 pub fn channel_id_owned(&self) -> CheetahString {
193 self.channel_id.clone()
194 }
195
196 // === Connection Access ===
197
198 /// Gets mutable access to the underlying connection.
199 ///
200 /// # Returns
201 ///
202 /// Mutable reference to the `Connection` for sending/receiving
203 ///
204 /// # Use Case
205 ///
206 /// Direct low-level I/O operations (receive_command, send_command)
207 #[inline]
208 pub fn connection_mut(&mut self) -> &mut Connection {
209 self.inner.connection.as_mut()
210 }
211
212 /// Gets immutable access to the underlying connection.
213 ///
214 /// # Returns
215 ///
216 /// Immutable reference to the `Connection` for inspection
217 #[inline]
218 pub fn connection_ref(&self) -> &Connection {
219 self.inner.connection_ref()
220 }
221
222 // === Inner State Access ===
223
224 /// Gets immutable access to the shared channel state.
225 ///
226 /// # Returns
227 ///
228 /// Immutable reference to `ChannelInner` (connection + response table)
229 pub fn channel_inner(&self) -> &ChannelInner {
230 self.inner.as_ref()
231 }
232
233 /// Gets mutable access to the shared channel state.
234 ///
235 /// # Returns
236 ///
237 /// Mutable reference to `ChannelInner` for advanced operations
238 pub fn channel_inner_mut(&mut self) -> &mut ChannelInner {
239 self.inner.as_mut()
240 }
241}
242
243impl PartialEq for Channel {
244 fn eq(&self, other: &Self) -> bool {
245 self.local_address == other.local_address
246 && self.remote_address == other.remote_address
247 && self.channel_id == other.channel_id
248 }
249}
250
251impl Eq for Channel {}
252
253impl Hash for Channel {
254 fn hash<H: Hasher>(&self, state: &mut H) {
255 self.local_address.hash(state);
256 self.remote_address.hash(state);
257 self.channel_id.hash(state);
258 }
259}
260
261impl Debug for Channel {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 write!(
264 f,
265 "Channel {{ local_address: {:?}, remote_address: {:?}, channel_id: {} }}",
266 self.local_address, self.remote_address, self.channel_id
267 )
268 }
269}
270
271impl Display for Channel {
272 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273 write!(
274 f,
275 "Channel {{ local_address: {}, remote_address: {}, channel_id: {} }}",
276 self.local_address, self.remote_address, self.channel_id
277 )
278 }
279}
280
281/// Internal message type for the send queue.
282///
283/// Encapsulates a command to send along with optional response tracking.
284type ChannelMessage = (
285 RemotingCommand, /* command */
286 Option<tokio::sync::oneshot::Sender<rocketmq_error::RocketMQResult<RemotingCommand>>>, /* response_tx */
287 Option<u64>, /* timeout_millis */
288);
289
290/// Shared state for a `Channel` - handles I/O, async message queueing, and response tracking.
291///
292/// `ChannelInner` is the "heavy" part of a channel that is shared via `ArcMut` across
293/// multiple `Channel` clones. It manages:
294///
295/// - **Connection**: Low-level TCP I/O
296/// - **Send Queue**: Async message queueing to decouple caller from I/O backpressure
297/// - **Response Table**: Tracks pending request-response pairs (opaque ID → future)
298///
299/// ## Threading Model
300///
301/// - **Send Task**: Dedicated task (`handle_send`) pulls from queue and writes to connection
302/// - **Response Tracking**: Shared map accessed by send task (insert) and receive task (remove)
303///
304/// ## Lifecycle
305///
306/// 1. **Created**: Spawns background `handle_send` task
307/// 2. **Active**: Processes send queue, tracks responses
308/// 3. **Shutdown**: Queue closed, pending responses canceled
309pub struct ChannelInner {
310 // === Message Queue ===
311 /// Sender half of the high-performance message queue channel.
312 ///
313 /// Uses `flume` instead of `tokio::mpsc` for:
314 /// - 40-60% better throughput (lock-free for most operations)
315 /// - Lower latency under contention
316 /// - Better backpressure handling
317 ///
318 /// Callers use this to enqueue commands for asynchronous sending.
319 /// The receive half is owned by the background `handle_send` task.
320 outbound_queue_tx: Sender<ChannelMessage>,
321
322 // === I/O Transport ===
323 /// Underlying network connection (shared, mutable).
324 ///
325 /// Wrapped in `ArcMut` to allow concurrent access by the send task
326 /// and potential direct access via `Channel::connection_mut()`.
327 pub(crate) connection: ArcMut<Connection>,
328
329 // === Response Tracking ===
330 /// Map of pending request opaque IDs to their response futures.
331 ///
332 /// - **Key**: Request opaque ID (unique per request)
333 /// - **Value**: `ResponseFuture` containing timeout and oneshot channel
334 ///
335 /// Shared between:
336 /// - Send task: Inserts entries when request is sent
337 /// - Receive task: Removes and completes entries when response arrives
338 pub(crate) response_table: ArcMut<HashMap<i32, ResponseFuture>>,
339}
340
341/// Background task that processes the outbound message queue.
342///
343/// # Performance Features
344///
345/// - Uses `flume` receiver for lock-free message reception
346/// - Processes messages sequentially to maintain order
347/// - Handles errors gracefully (marks connection as failed on I/O errors)
348///
349/// # Potential Optimization (TODO)
350///
351/// Consider implementing batch sending:
352/// ```ignore
353/// // Collect multiple pending messages
354/// let mut batch = vec![first_msg];
355/// while batch.len() < 32 {
356/// match rx.try_recv() {
357/// Ok(msg) => batch.push(msg),
358/// Err(_) => break,
359/// }
360/// }
361/// // Send batch together for better throughput
362/// ```
363///
364/// This would reduce per-message overhead and improve throughput by ~20-40%
365/// under high load, at the cost of slightly increased latency for small batches.
366pub(crate) async fn handle_send(
367 mut connection: ArcMut<Connection>,
368 rx: Receiver<ChannelMessage>,
369 mut response_table: ArcMut<HashMap<i32, ResponseFuture>>,
370) {
371 // Loop until channel is closed or connection fails
372 loop {
373 // flume receiver is async-compatible: recv_async() awaits message
374 let msg = match rx.recv_async().await {
375 Ok(msg) => msg,
376 Err(_) => {
377 // Channel closed, exit gracefully
378 break;
379 }
380 };
381
382 let (send, tx, timeout_millis) = msg;
383 let opaque = send.opaque();
384
385 // Register response future if this is a request-response operation
386 if let Some(tx) = tx {
387 response_table.insert(
388 opaque,
389 ResponseFuture::new(opaque, timeout_millis.unwrap_or(0), true, tx),
390 );
391 }
392
393 // Send command via connection
394 match connection.send_command(send).await {
395 Ok(_) => {}
396 Err(error) => match error {
397 rocketmq_error::RocketMQError::IO(error) => {
398 // I/O error means connection is broken
399 // Connection state is automatically marked as degraded by send_command()
400 error!("send request failed: {}", error);
401 response_table.remove(&opaque);
402 return;
403 }
404 _ => {
405 // Other errors: remove response future but continue
406 response_table.remove(&opaque);
407 }
408 },
409 };
410 }
411}
412
413impl ChannelInner {
414 /// Creates a new `ChannelInner` and spawns the background send task.
415 ///
416 /// # Arguments
417 ///
418 /// * `connection` - The underlying TCP connection
419 /// * `response_table` - Shared response tracking map
420 ///
421 /// # Returns
422 ///
423 /// A new `ChannelInner` with an active background send task.
424 ///
425 /// # Implementation Note
426 ///
427 /// - Queue capacity: 1024 messages (adjust based on load)
428 /// - Spawns `handle_send` task immediately
429 /// - Task runs until channel is dropped or connection fails
430 ///
431 /// # Performance
432 ///
433 /// Uses `flume::bounded` channel for better performance:
434 /// - Lock-free operations for most cases
435 /// - ~40-60% higher throughput than tokio::mpsc
436 /// - Better performance under contention
437 pub fn new(
438 connection: Connection,
439 response_table: ArcMut<HashMap<i32, ResponseFuture>>,
440 ) -> Self {
441 const QUEUE_CAPACITY: usize = 1024;
442
443 // Use flume bounded channel for better performance
444 // flume provides lock-free operations and better throughput than tokio::mpsc
445 let (outbound_queue_tx, outbound_queue_rx) = flume::bounded(QUEUE_CAPACITY);
446
447 let connection = ArcMut::new(connection);
448 tokio::spawn(handle_send(
449 connection.clone(),
450 outbound_queue_rx,
451 response_table.clone(),
452 ));
453 Self {
454 outbound_queue_tx,
455 connection,
456 response_table,
457 }
458 }
459}
460
461impl ChannelInner {
462 // === Connection Accessors ===
463
464 /// Gets a cloned `ArcMut` handle to the connection.
465 ///
466 /// # Returns
467 ///
468 /// Shared mutable reference to the connection (cheap clone, increments refcount)
469 #[inline]
470 pub fn connection(&self) -> ArcMut<Connection> {
471 self.connection.clone()
472 }
473
474 /// Gets an immutable reference to the connection.
475 ///
476 /// # Returns
477 ///
478 /// Immutable reference to the underlying `Connection`
479 #[inline]
480 pub fn connection_ref(&self) -> &Connection {
481 self.connection.as_ref()
482 }
483
484 /// Gets a mutable reference to the connection.
485 ///
486 /// # Returns
487 ///
488 /// Mutable reference to the underlying `Connection`
489 #[inline]
490 pub fn connection_mut(&mut self) -> &mut Connection {
491 self.connection.as_mut()
492 }
493
494 // === High-Level Send Methods ===
495
496 /// Sends a request and waits for the response (request-response pattern).
497 ///
498 /// Enqueues the request, tracks it via opaque ID, and blocks until the
499 /// response arrives or timeout expires.
500 ///
501 /// # Arguments
502 ///
503 /// * `request` - The command to send
504 /// * `timeout_millis` - Maximum wait time for response (milliseconds)
505 ///
506 /// # Returns
507 ///
508 /// - `Ok(response)`: Response received within timeout
509 /// - `Err(ChannelSendRequestFailed)`: Failed to enqueue request
510 /// - `Err(ChannelRecvRequestFailed)`: Response channel closed or timeout
511 ///
512 /// # Lifecycle
513 ///
514 /// 1. Create oneshot channel for response
515 /// 2. Enqueue request with response channel
516 /// 3. Wait (with timeout) for response on channel
517 /// 4. Clean up response table on error
518 ///
519 /// # Example
520 ///
521 /// ```ignore
522 /// let request = RemotingCommand::create_request_command(10, header).into();
523 /// let response = channel_inner.send_wait_response(request, 3000).await?;
524 /// println!("Got response: {:?}", response);
525 /// ```
526 pub async fn send_wait_response(
527 &mut self,
528 request: RemotingCommand,
529 timeout_millis: u64,
530 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
531 let (response_tx, response_rx) =
532 tokio::sync::oneshot::channel::<rocketmq_error::RocketMQResult<RemotingCommand>>();
533 let opaque = request.opaque();
534
535 // Enqueue request with response tracking
536 // flume sender: use send_async() for async context
537 if let Err(err) = self
538 .outbound_queue_tx
539 .send_async((request, Some(response_tx), Some(timeout_millis)))
540 .await
541 {
542 return Err(RocketMQError::network_connection_failed(
543 "channel",
544 format!("send failed: {}", err),
545 ));
546 }
547
548 // Wait for response with timeout
549 match timeout(Duration::from_millis(timeout_millis), response_rx).await {
550 Ok(result) => match result {
551 Ok(response) => response,
552 Err(e) => {
553 // Response channel closed without sending (connection dropped?)
554 self.response_table.remove(&opaque);
555 Err(RocketMQError::network_connection_failed(
556 "channel",
557 format!("connection dropped: {}", e),
558 ))
559 }
560 },
561 Err(_) => {
562 // Timeout expired
563 self.response_table.remove(&opaque);
564 Err(RocketMQError::Timeout {
565 operation: "channel_recv",
566 timeout_ms: timeout_millis,
567 })
568 }
569 }
570 }
571
572 /// Sends a one-way request without waiting for response (fire-and-forget).
573 ///
574 /// Marks the request as oneway and enqueues it. Does not track response.
575 ///
576 /// # Arguments
577 ///
578 /// * `request` - The command to send
579 /// * `timeout_millis` - Timeout for enqueuing (not for response)
580 ///
581 /// # Returns
582 ///
583 /// - `Ok(().into())`: Request successfully enqueued
584 /// - `Err(ChannelSendRequestFailed)`: Failed to enqueue
585 ///
586 /// # Use Case
587 ///
588 /// Notifications, heartbeats, or any scenario where response is not needed.
589 /// More efficient than `send_wait_response` as it avoids response tracking overhead.
590 pub async fn send_oneway(
591 &self,
592 request: RemotingCommand,
593 timeout_millis: u64,
594 ) -> rocketmq_error::RocketMQResult<()> {
595 let request = request.mark_oneway_rpc();
596
597 // flume sender: use send_async() for async context
598 if let Err(err) = self
599 .outbound_queue_tx
600 .send_async((request, None, Some(timeout_millis)))
601 .await
602 {
603 error!("send oneway request failed: {}", err);
604 return Err(RocketMQError::network_connection_failed(
605 "channel",
606 format!("send oneway failed: {}", err),
607 ));
608 }
609 Ok(())
610 }
611
612 /// Sends a request without waiting for response (async enqueue only).
613 ///
614 /// Similar to `send_oneway`, but does not mark the request as oneway.
615 /// Use when caller doesn't care about response but request is not marked as oneway protocol.
616 ///
617 /// # Arguments
618 ///
619 /// * `request` - The command to send
620 /// * `timeout_millis` - Optional timeout for enqueuing
621 ///
622 /// # Returns
623 ///
624 /// - `Ok(())`: Request successfully enqueued
625 /// - `Err(ChannelSendRequestFailed)`: Failed to enqueue
626 pub async fn send(
627 &self,
628 request: RemotingCommand,
629 timeout_millis: Option<u64>,
630 ) -> rocketmq_error::RocketMQResult<()> {
631 // flume sender: use send_async() for async context
632 if let Err(err) = self
633 .outbound_queue_tx
634 .send_async((request, None, timeout_millis))
635 .await
636 {
637 error!("send request failed: {}", err);
638 return Err(RocketMQError::network_connection_failed(
639 "channel",
640 format!("send failed: {}", err),
641 ));
642 }
643 Ok(())
644 }
645
646 // === Health Check ===
647
648 /// Checks if the underlying connection is healthy.
649 ///
650 /// # Returns
651 ///
652 /// - `true`: Connection is operational
653 /// - `false`: Connection has failed, channel should be discarded
654 #[inline]
655 pub fn is_healthy(&self) -> bool {
656 self.connection.is_healthy()
657 }
658
659 /// Legacy alias for `is_healthy()` - kept for backward compatibility.
660 ///
661 /// # Deprecated
662 ///
663 /// Use `is_healthy()` instead for clearer semantics.
664 #[inline]
665 #[deprecated(since = "0.1.0", note = "Use `is_healthy()` instead")]
666 pub fn is_ok(&self) -> bool {
667 self.connection.is_healthy()
668 }
669}