Connection

Struct Connection 

Source
pub struct Connection { /* private fields */ }
Expand description

Bidirectional TCP connection for RocketMQ protocol communication.

Connection handles low-level frame encoding/decoding and provides high-level APIs for sending/receiving RemotingCommand messages. It manages I/O buffers and broadcasts connection state changes via a watch channel.

§Lifecycle & State Management

Tokio Best Practice: Connection health is determined by I/O operation results, not by polling a boolean flag. State changes are broadcast via watch channel:

┌──────────┐  I/O Success   ┌──────────┐
│ Healthy  │ ──────────────► │ Healthy  │
└──────────┘                 └──────────┘
     │                            │
     │ I/O Error                  │ I/O Error
     ↓                            ↓
┌──────────┐                 ┌──────────┐
│ Degraded │                 │ Degraded │
└──────────┘                 └──────────┘
     │                            │
     │ close()                    │
     ↓                            ↓
┌──────────┐                 ┌──────────┐
│  Closed  │                 │  Closed  │
└──────────┘                 └──────────┘
  1. Created: New connection from TcpStream (Healthy)
  2. Active: Processing requests/responses (Healthy)
  3. Degraded: I/O error occurred, broadcast state change
  4. Closed: Stream ended or explicit shutdown

§Threading

  • Safe for concurrent sends (internal buffering)
  • Receives must be sequential (single reader)
  • State monitoring: Multiple tasks can watch state via subscribe()

§Key Design Principles

  • No explicit ok flag: Connection validity determined by I/O results
  • Broadcast state changes: Using watch channel for reactive updates
  • Fail-fast: I/O errors immediately update state and return error
  • Zero polling: Subscribers notified automatically on state change

Implementations§

Source§

impl Connection

Source

pub fn new(tcp_stream: TcpStream) -> Connection

Creates a new Connection instance with initial Healthy state.

§Arguments
  • tcp_stream - The TcpStream associated with the connection
§Returns

A new Connection instance with a watch channel for state monitoring

§Example
let stream = TcpStream::connect("127.0.0.1:9876").await?;
let connection = Connection::new(stream);

// Subscribe to state changes
let mut state_watcher = connection.subscribe();
tokio::spawn(async move {
    while state_watcher.changed().await.is_ok() {
        let state = *state_watcher.borrow();
        println!("Connection state: {:?}", state);
    }
});
Source

pub fn inbound_stream(&self) -> &SplitStream<Framed<TcpStream, CompositeCodec>>

Gets a reference to the inbound stream for receiving messages

§Returns

Immutable reference to the inbound message stream

Source

pub fn outbound_sink( &self, ) -> &SplitSink<Framed<TcpStream, CompositeCodec>, Bytes>

Gets a reference to the outbound sink for sending messages

§Returns

Immutable reference to the outbound message sink

Source

pub async fn receive_command( &mut self, ) -> Option<RocketMQResult<RemotingCommand>>

Receives the next RemotingCommand from the peer.

Blocks until a complete frame is available or the stream ends.

§Returns
  • Some(Ok(command)): Successfully received and decoded a command
  • Some(Err(e)): Decoding error occurred
  • None: Stream ended (peer closed connection)
§Example
while let Some(result) = connection.receive_command().await {
    match result {
        Ok(cmd) => handle_command(cmd),
        Err(e) => eprintln!("Decode error: {}", e),
    }
}
// Connection closed
Source

pub async fn send_command( &mut self, command: RemotingCommand, ) -> RocketMQResult<()>

Sends a RemotingCommand to the peer (consumes command).

Encodes the command into the internal buffer, then flushes to the network. Automatically marks connection as Degraded on I/O errors.

§Arguments
  • command - The command to send (consumed)
§Returns
  • Ok(()): Command successfully sent
  • Err(e): Network I/O error occurred (connection marked as Degraded)
§State Management

On error, this method:

  1. Marks connection as Degraded via watch channel
  2. Broadcasts state change to all subscribers
  3. Returns the error to caller

No need to explicitly check is_healthy() before calling - just handle the Result and the connection state is automatically managed.

§Lifecycle
  1. Encode command header + body into reusable buffer
  2. Use zero-copy split_to() to extract buffer contents as Bytes
  3. Send extracted bytes via outbound sink
  4. Buffer is now empty and ready for next command (no clear() needed)
§Performance Optimization
  • Uses split_to(len) instead of split() for better performance
  • split_to() returns all data and leaves buffer empty, eliminating need for clear()
  • freeze() converts BytesMut to Bytes with zero-copy (just refcount increment)
Source

pub async fn send_command_ref( &mut self, command: &mut RemotingCommand, ) -> RocketMQResult<()>

Sends a RemotingCommand to the peer (borrows command).

Similar to send_command, but borrows the command mutably instead of consuming it. Use when the caller needs to retain ownership. Automatically marks connection as Degraded on I/O errors.

§Arguments
  • command - Mutable reference to the command to send
§Returns
  • Ok(()): Command successfully sent
  • Err(e): Network I/O error occurred (connection marked as Degraded)
§Note

This method may consume the command’s body (take_body()), modifying the original command.

Source

pub async fn send_batch( &mut self, commands: Vec<RemotingCommand>, ) -> RocketMQResult<()>

Sends multiple RemotingCommands in a single batch (optimized for throughput).

Automatically marks connection as Degraded on I/O errors.

§Performance Benefits
  • Reduced system calls: Multiple commands sent in one syscall
  • Better CPU cache: Encoding loop stays hot
  • Lower latency: No network round-trips between commands
§Benchmarks
send_command() x 100:  ~50ms  (100 syscalls)
send_batch() x 100:    ~15ms  (1 syscall)
Improvement: 3.3x faster
§Arguments
  • commands - Vector of commands to send (consumed for zero-copy)
§Returns
  • Ok(()): All commands sent successfully
  • Err(e): Network I/O error (connection marked as Degraded)
§Example
let batch = vec![cmd1, cmd2, cmd3];
connection.send_batch(batch).await?;
Source

pub async fn send_bytes(&mut self, bytes: Bytes) -> RocketMQResult<()>

Sends raw Bytes directly to the peer (zero-copy).

Bypasses command encoding and sends pre-serialized bytes directly. Use for forwarding or when bytes are already encoded. Automatically marks connection as Degraded on I/O errors.

§Arguments
  • bytes - The bytes to send (reference-counted, zero-copy)
§Returns
  • Ok(()): Bytes successfully sent
  • Err(e): Network I/O error occurred (connection marked as Degraded)
§Performance

This is the most efficient send method as it avoids intermediate buffering and serialization overhead.

Source

pub async fn send_slice(&mut self, slice: &'static [u8]) -> RocketMQResult<()>

Sends a static byte slice to the peer (zero-copy).

Converts a &'static [u8] to Bytes and sends. Use for compile-time known data (e.g., protocol constants). Automatically marks connection as Degraded on I/O errors.

§Arguments
  • slice - Static byte slice with 'static lifetime
§Returns
  • Ok(()): Slice successfully sent
  • Err(e): Network I/O error occurred (connection marked as Degraded)
§Example
const PING: &[u8] = b"PING\r\n";
connection.send_slice(PING).await?;
Source

pub fn connection_id(&self) -> &ConnectionId

Gets the unique identifier for this connection.

§Returns

Reference to the connection ID (UUID-based string)

Source

pub fn state(&self) -> ConnectionState

Gets the current connection state.

§Returns

Current ConnectionState (Healthy, Degraded, or Closed)

§Performance

This is a fast, lock-free read from the watch channel receiver. No system calls or network operations involved.

§Example
if connection.state() == ConnectionState::Healthy {
    // Safe to send
    connection.send_command(cmd).await?;
}
Source

pub fn is_healthy(&self) -> bool

Checks if the connection is in a healthy state (convenience method).

§Returns
  • true: Connection is Healthy and operational
  • false: Connection is Degraded or Closed
§Note

Prefer using send_*() methods directly rather than checking state first. This method is provided for backward compatibility and specific use cases like connection pool eviction.

Best practice (Tokio-idiomatic):

// Don't do this:
if connection.is_healthy() {
    connection.send_command(cmd).await?;
}

// Do this instead:
match connection.send_command(cmd).await {
    Ok(()) => { /* success */ }
    Err(e) => { /* connection automatically marked as degraded */ }
}
Source

pub fn subscribe(&self) -> Receiver<ConnectionState>

Subscribes to connection state changes.

§Returns

A watch::Receiver that notifies on state transitions

§Example: Monitor state in background task
let mut state_watcher = connection.subscribe();
tokio::spawn(async move {
    while state_watcher.changed().await.is_ok() {
        match *state_watcher.borrow() {
            ConnectionState::Healthy => println!("Connection restored"),
            ConnectionState::Degraded => println!("Connection degraded"),
            ConnectionState::Closed => {
                println!("Connection closed");
                break;
            }
        }
    }
});
§Example: Wait for state change with timeout
let mut state_watcher = connection.subscribe();
tokio::select! {
    _ = state_watcher.changed() => {
        println!("State changed to: {:?}", *state_watcher.borrow());
    }
    _ = tokio::time::sleep(Duration::from_secs(5)) => {
        println!("No state change within 5 seconds");
    }
}
Source

pub fn close(&self)

Explicitly closes the connection and broadcasts Closed state.

§Example
connection.close();
assert_eq!(connection.state(), ConnectionState::Closed);
Source

pub fn connection_is_ok(&self) -> bool

👎Deprecated since 0.7.0: Use is_healthy() or state() instead

Legacy alias for backward compatibility.

§Deprecated

Use is_healthy() or state() instead for clearer semantics.

Trait Implementations§

Source§

impl Hash for Connection

Source§

fn hash<H: Hasher>(&self, state: &mut H)

Feeds this value into the given Hasher. Read more
1.3.0 · Source§

fn hash_slice<H>(data: &[Self], state: &mut H)
where H: Hasher, Self: Sized,

Feeds a slice of this type into the given Hasher. Read more
Source§

impl PartialEq for Connection

Source§

fn eq(&self, other: &Self) -> bool

Tests for self and other values to be equal, and is used by ==.
1.0.0 · Source§

fn ne(&self, other: &Rhs) -> bool

Tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.
Source§

impl Eq for Connection

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> Conv for T

Source§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
Source§

impl<Q, K> Equivalent<K> for Q
where Q: Eq + ?Sized, K: Borrow<Q> + ?Sized,

Source§

fn equivalent(&self, key: &K) -> bool

Checks if this value is equivalent to the given key. Read more
Source§

impl<Q, K> Equivalent<K> for Q
where Q: Eq + ?Sized, K: Borrow<Q> + ?Sized,

Source§

fn equivalent(&self, key: &K) -> bool

Checks if this value is equivalent to the given key. Read more
Source§

impl<Q, K> Equivalent<K> for Q
where Q: Eq + ?Sized, K: Borrow<Q> + ?Sized,

Source§

fn equivalent(&self, key: &K) -> bool

Compare self to key and return true if they are equal.
Source§

impl<T> FmtForward for T

Source§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
Source§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
Source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
Source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
Source§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
Source§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
Source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
Source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
Source§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pipe for T
where T: ?Sized,

Source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
Source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
Source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
Source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
Source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
Source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
Source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
Source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
Source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Tap for T

Source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
Source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
Source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
Source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
Source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
Source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
Source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
Source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
Source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
Source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
Source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
Source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
Source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
Source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
Source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
Source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
Source§

impl<T> TryConv for T

Source§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> MaybeSendSync for T