RefactoredConnection

Struct RefactoredConnection 

Source
pub struct RefactoredConnection { /* private fields */ }
Expand description

Refactored Connection based on FramedRead + FramedWrite

§Architecture Design

+-----------------------------------------------------------+
|             RefactoredConnection                          |
+-----------------------------------------------------------+
|                                                           |
| Read Side (FramedRead):                                   |
| TcpStream -> OwnedReadHalf -> FramedRead<Codec>           |
|             |                   |                         |
|      Direct Access         Auto Decode                    |
|                                                           |
| Write Side (FramedWrite - Lock-free):                     |
| TcpStream -> OwnedWriteHalf -> FramedWrite<Codec>         |
|             |                   |                         |
|      Zero-copy Write        Codec Encode                  |
|                                                           |
+-----------------------------------------------------------+

§Core Advantages

  1. Separated Read/Write: Independent management after into_split()
  2. Dual Mode Support: Codec encoding + zero-copy direct write
  3. High Performance: FramedWrite built-in optimization + bypassable
  4. Flexible Control: Full control over flush timing
  5. Lock-free Write: Writer doesn’t use Mutex, caller ensures exclusive access

§Concurrency Model

  • Reader: No Mutex needed, typically only one receive loop reads
  • Writer: Lock-free design, caller ensures exclusive access (via &mut self)

Implementations§

Source§

impl RefactoredConnection

Source

pub fn new(stream: TcpStream) -> Self

Create new connection

Source

pub fn with_capacity(stream: TcpStream, capacity: usize) -> Self

Create connection with specified buffer capacity

Source

pub async fn send_command( &mut self, command: RemotingCommand, ) -> RocketMQResult<()>

Send RemotingCommand using Codec

RemotingCommand is first encoded to Bytes (containing complete RocketMQ protocol frame), then sent directly via CompositeCodec’s BytesCodec

§Performance Optimization

Reuses internal encode_buffer to avoid allocating new BytesMut on each call

Source

pub async fn recv_command(&mut self) -> RocketMQResult<Option<RemotingCommand>>

Receive and decode message

CompositeCodec automatically decodes to RemotingCommand

Note: This method takes &mut self, ensuring only one receive loop reads

Source

pub async fn send_bytes(&mut self, bytes: Bytes) -> RocketMQResult<()>

Send raw Bytes directly (bypass Codec)

§Note

The input bytes must already contain a complete RocketMQ protocol frame (i.e., data already encoded via fast_header_encode)

To send RemotingCommand, use send_command method instead

Source

pub async fn send_commands_batch( &mut self, commands: Vec<RemotingCommand>, ) -> RocketMQResult<()>

Batch send RemotingCommand (using feed + flush)

This is the recommended batch sending method, reducing system call count

§Performance Optimization

Reuses internal encode_buffer to avoid allocating new BytesMut each time. split() empties buffer but preserves capacity, achieving zero-allocation reuse.

Source

pub async fn send_bytes_batch( &mut self, chunks: Vec<Bytes>, ) -> RocketMQResult<()>

Batch send raw Bytes (bypass Codec)

§Note

The input chunks must already contain complete RocketMQ protocol frames

§Performance

Uses direct write_all, avoiding Codec layer overhead

Source

pub async fn send_bytes_zero_copy( &mut self, chunks: Vec<Bytes>, ) -> RocketMQResult<()>

Zero-copy send (bypass Codec, using write_vectored)

§Implementation

Accesses underlying OwnedWriteHalf via get_mut(), uses write_vectored for zero-copy scatter-gather I/O

§Note

Must flush FramedWrite buffer first to ensure correct data ordering

Source

pub async fn send_bytes_zero_copy_single( &mut self, data: Bytes, ) -> RocketMQResult<()>

Zero-copy send single chunk

Suitable for sending single large block of data

Source

pub async fn send_response_hybrid( &mut self, response_header: RemotingCommand, message_bodies: Vec<Bytes>, ) -> RocketMQResult<()>

Hybrid mode: Response header (Codec) + Message body (Zero-copy)

This is the recommended implementation for Pull message responses

§Flow
  1. Send response header (encoded as complete RocketMQ frame via BytesCodec)
  2. Flush to ensure response header is sent
  3. Zero-copy send message bodies (bypass Codec, direct write_all)
  4. Final flush
§Performance Optimization

Reuses internal encode_buffer, split() empties buffer but preserves capacity

Source

pub async fn send_response_hybrid_vectored( &mut self, response_header_bytes: Bytes, message_bodies: Vec<Bytes>, ) -> RocketMQResult<()>

Hybrid mode optimized: using write_vectored

Send response header and all message bodies in one shot (scatter-gather I/O)

§Parameters
  • response_header_bytes: Pre-encoded response header (already encoded via CompositeCodec)
  • message_bodies: Message body list
§Performance

This is the highest performance sending method, requiring only one system call

Source

pub fn state(&self) -> ConnectionState

Get current connection state

Source

pub fn mark_degraded(&self)

Mark connection as degraded

Used to indicate connection quality degradation but still usable

Source

pub fn mark_healthy(&self)

Mark connection as healthy

Source

pub async fn close(&mut self) -> RocketMQResult<()>

Close connection

§Flow
  1. Mark state as Closed
  2. Flush all pending data
  3. Shutdown underlying TCP connection
Source

pub fn subscribe_state(&self) -> Receiver<ConnectionState>

Subscribe to connection state changes

Returns a watch::Receiver that can be used to monitor state changes

Source

pub fn connection_id(&self) -> &CheetahString

Get connection ID

Source

pub fn framed_reader_mut( &mut self, ) -> &mut FramedRead<OwnedReadHalf, CompositeCodec>

Get mutable reference to FramedRead

Used for advanced read operations

Note: Requires &mut self to ensure exclusive access

Source

pub fn framed_writer_mut( &mut self, ) -> &mut FramedWrite<OwnedWriteHalf, CompositeCodec>

Get mutable reference to FramedWrite

Used for advanced write operations

Note: Requires &mut self to ensure exclusive access

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> Conv for T

Source§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
Source§

impl<T> FmtForward for T

Source§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
Source§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
Source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
Source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
Source§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
Source§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
Source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
Source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
Source§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pipe for T
where T: ?Sized,

Source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
Source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
Source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
Source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
Source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
Source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
Source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
Source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
Source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Tap for T

Source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
Source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
Source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
Source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
Source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
Source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
Source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
Source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
Source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
Source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
Source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
Source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
Source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
Source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
Source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
Source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
Source§

impl<T> TryConv for T

Source§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> MaybeSendSync for T