pub struct Connection { /* private fields */ }Expand description
Bidirectional TCP connection for RocketMQ protocol communication.
Connection handles low-level frame encoding/decoding and provides high-level
APIs for sending/receiving RemotingCommand messages. It manages I/O buffers
and broadcasts connection state changes via a watch channel.
§Lifecycle & State Management
Tokio Best Practice: Connection health is determined by I/O operation results,
not by polling a boolean flag. State changes are broadcast via watch channel:
┌──────────┐ I/O Success ┌──────────┐
│ Healthy │ ──────────────► │ Healthy │
└──────────┘ └──────────┘
│ │
│ I/O Error │ I/O Error
↓ ↓
┌──────────┐ ┌──────────┐
│ Degraded │ │ Degraded │
└──────────┘ └──────────┘
│ │
│ close() │
↓ ↓
┌──────────┐ ┌──────────┐
│ Closed │ │ Closed │
└──────────┘ └──────────┘- Created: New connection from
TcpStream(Healthy) - Active: Processing requests/responses (Healthy)
- Degraded: I/O error occurred, broadcast state change
- Closed: Stream ended or explicit shutdown
§Threading
- Safe for concurrent sends (internal buffering)
- Receives must be sequential (single reader)
- State monitoring: Multiple tasks can watch state via
subscribe()
§Key Design Principles
- No explicit
okflag: Connection validity determined by I/O results - Broadcast state changes: Using
watchchannel for reactive updates - Fail-fast: I/O errors immediately update state and return error
- Zero polling: Subscribers notified automatically on state change
Implementations§
Source§impl Connection
impl Connection
Sourcepub fn new(tcp_stream: TcpStream) -> Connection
pub fn new(tcp_stream: TcpStream) -> Connection
Creates a new Connection instance with initial Healthy state.
§Arguments
tcp_stream- TheTcpStreamassociated with the connection
§Returns
A new Connection instance with a watch channel for state monitoring
§Example
let stream = TcpStream::connect("127.0.0.1:9876").await?;
let connection = Connection::new(stream);
// Subscribe to state changes
let mut state_watcher = connection.subscribe();
tokio::spawn(async move {
while state_watcher.changed().await.is_ok() {
let state = *state_watcher.borrow();
println!("Connection state: {:?}", state);
}
});Sourcepub fn inbound_stream(&self) -> &SplitStream<Framed<TcpStream, CompositeCodec>>
pub fn inbound_stream(&self) -> &SplitStream<Framed<TcpStream, CompositeCodec>>
Gets a reference to the inbound stream for receiving messages
§Returns
Immutable reference to the inbound message stream
Sourcepub fn outbound_sink(
&self,
) -> &SplitSink<Framed<TcpStream, CompositeCodec>, Bytes>
pub fn outbound_sink( &self, ) -> &SplitSink<Framed<TcpStream, CompositeCodec>, Bytes>
Gets a reference to the outbound sink for sending messages
§Returns
Immutable reference to the outbound message sink
Sourcepub async fn receive_command(
&mut self,
) -> Option<RocketMQResult<RemotingCommand>>
pub async fn receive_command( &mut self, ) -> Option<RocketMQResult<RemotingCommand>>
Receives the next RemotingCommand from the peer.
Blocks until a complete frame is available or the stream ends.
§Returns
Some(Ok(command)): Successfully received and decoded a commandSome(Err(e)): Decoding error occurredNone: Stream ended (peer closed connection)
§Example
while let Some(result) = connection.receive_command().await {
match result {
Ok(cmd) => handle_command(cmd),
Err(e) => eprintln!("Decode error: {}", e),
}
}
// Connection closedSourcepub async fn send_command(
&mut self,
command: RemotingCommand,
) -> RocketMQResult<()>
pub async fn send_command( &mut self, command: RemotingCommand, ) -> RocketMQResult<()>
Sends a RemotingCommand to the peer (consumes command).
Encodes the command into the internal buffer, then flushes to the network. Automatically marks connection as Degraded on I/O errors.
§Arguments
command- The command to send (consumed)
§Returns
Ok(()): Command successfully sentErr(e): Network I/O error occurred (connection marked as Degraded)
§State Management
On error, this method:
- Marks connection as
Degradedvia watch channel - Broadcasts state change to all subscribers
- Returns the error to caller
No need to explicitly check is_healthy() before calling - just
handle the Result and the connection state is automatically managed.
§Lifecycle
- Encode command header + body into reusable buffer
- Use zero-copy
split_to()to extract buffer contents asBytes - Send extracted bytes via outbound sink
- Buffer is now empty and ready for next command (no clear() needed)
§Performance Optimization
- Uses
split_to(len)instead ofsplit()for better performance split_to()returns all data and leaves buffer empty, eliminating need for clear()freeze()converts BytesMut to Bytes with zero-copy (just refcount increment)
Sourcepub async fn send_command_ref(
&mut self,
command: &mut RemotingCommand,
) -> RocketMQResult<()>
pub async fn send_command_ref( &mut self, command: &mut RemotingCommand, ) -> RocketMQResult<()>
Sends a RemotingCommand to the peer (borrows command).
Similar to send_command, but borrows the command mutably instead of
consuming it. Use when the caller needs to retain ownership.
Automatically marks connection as Degraded on I/O errors.
§Arguments
command- Mutable reference to the command to send
§Returns
Ok(()): Command successfully sentErr(e): Network I/O error occurred (connection marked as Degraded)
§Note
This method may consume the command’s body (take_body()), modifying
the original command.
Sourcepub async fn send_batch(
&mut self,
commands: Vec<RemotingCommand>,
) -> RocketMQResult<()>
pub async fn send_batch( &mut self, commands: Vec<RemotingCommand>, ) -> RocketMQResult<()>
Sends multiple RemotingCommands in a single batch (optimized for throughput).
Automatically marks connection as Degraded on I/O errors.
§Performance Benefits
- Reduced system calls: Multiple commands sent in one syscall
- Better CPU cache: Encoding loop stays hot
- Lower latency: No network round-trips between commands
§Benchmarks
send_command() x 100: ~50ms (100 syscalls)
send_batch() x 100: ~15ms (1 syscall)
Improvement: 3.3x faster§Arguments
commands- Vector of commands to send (consumed for zero-copy)
§Returns
Ok(()): All commands sent successfullyErr(e): Network I/O error (connection marked as Degraded)
§Example
let batch = vec![cmd1, cmd2, cmd3];
connection.send_batch(batch).await?;Sourcepub async fn send_bytes(&mut self, bytes: Bytes) -> RocketMQResult<()>
pub async fn send_bytes(&mut self, bytes: Bytes) -> RocketMQResult<()>
Sends raw Bytes directly to the peer (zero-copy).
Bypasses command encoding and sends pre-serialized bytes directly. Use for forwarding or when bytes are already encoded. Automatically marks connection as Degraded on I/O errors.
§Arguments
bytes- The bytes to send (reference-counted, zero-copy)
§Returns
Ok(()): Bytes successfully sentErr(e): Network I/O error occurred (connection marked as Degraded)
§Performance
This is the most efficient send method as it avoids intermediate buffering and serialization overhead.
Sourcepub async fn send_slice(&mut self, slice: &'static [u8]) -> RocketMQResult<()>
pub async fn send_slice(&mut self, slice: &'static [u8]) -> RocketMQResult<()>
Sends a static byte slice to the peer (zero-copy).
Converts a &'static [u8] to Bytes and sends. Use for compile-time
known data (e.g., protocol constants).
Automatically marks connection as Degraded on I/O errors.
§Arguments
slice- Static byte slice with'staticlifetime
§Returns
Ok(()): Slice successfully sentErr(e): Network I/O error occurred (connection marked as Degraded)
§Example
const PING: &[u8] = b"PING\r\n";
connection.send_slice(PING).await?;Sourcepub fn connection_id(&self) -> &ConnectionId
pub fn connection_id(&self) -> &ConnectionId
Gets the unique identifier for this connection.
§Returns
Reference to the connection ID (UUID-based string)
Sourcepub fn state(&self) -> ConnectionState
pub fn state(&self) -> ConnectionState
Gets the current connection state.
§Returns
Current ConnectionState (Healthy, Degraded, or Closed)
§Performance
This is a fast, lock-free read from the watch channel receiver. No system calls or network operations involved.
§Example
if connection.state() == ConnectionState::Healthy {
// Safe to send
connection.send_command(cmd).await?;
}Sourcepub fn is_healthy(&self) -> bool
pub fn is_healthy(&self) -> bool
Checks if the connection is in a healthy state (convenience method).
§Returns
true: Connection isHealthyand operationalfalse: Connection isDegradedorClosed
§Note
Prefer using send_*() methods directly rather than checking state first.
This method is provided for backward compatibility and specific use cases
like connection pool eviction.
Best practice (Tokio-idiomatic):
// Don't do this:
if connection.is_healthy() {
connection.send_command(cmd).await?;
}
// Do this instead:
match connection.send_command(cmd).await {
Ok(()) => { /* success */ }
Err(e) => { /* connection automatically marked as degraded */ }
}Sourcepub fn subscribe(&self) -> Receiver<ConnectionState>
pub fn subscribe(&self) -> Receiver<ConnectionState>
Subscribes to connection state changes.
§Returns
A watch::Receiver that notifies on state transitions
§Example: Monitor state in background task
let mut state_watcher = connection.subscribe();
tokio::spawn(async move {
while state_watcher.changed().await.is_ok() {
match *state_watcher.borrow() {
ConnectionState::Healthy => println!("Connection restored"),
ConnectionState::Degraded => println!("Connection degraded"),
ConnectionState::Closed => {
println!("Connection closed");
break;
}
}
}
});§Example: Wait for state change with timeout
let mut state_watcher = connection.subscribe();
tokio::select! {
_ = state_watcher.changed() => {
println!("State changed to: {:?}", *state_watcher.borrow());
}
_ = tokio::time::sleep(Duration::from_secs(5)) => {
println!("No state change within 5 seconds");
}
}Sourcepub fn connection_is_ok(&self) -> bool
👎Deprecated since 0.7.0: Use is_healthy() or state() instead
pub fn connection_is_ok(&self) -> bool
is_healthy() or state() insteadLegacy alias for backward compatibility.
§Deprecated
Use is_healthy() or state() instead for clearer semantics.
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for Connection
impl !RefUnwindSafe for Connection
impl Send for Connection
impl Sync for Connection
impl Unpin for Connection
impl !UnwindSafe for Connection
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<Q, K> Equivalent<K> for Q
impl<Q, K> Equivalent<K> for Q
Source§impl<Q, K> Equivalent<K> for Q
impl<Q, K> Equivalent<K> for Q
Source§impl<Q, K> Equivalent<K> for Q
impl<Q, K> Equivalent<K> for Q
Source§fn equivalent(&self, key: &K) -> bool
fn equivalent(&self, key: &K) -> bool
key and return true if they are equal.Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.