pub struct RocketmqDefaultClient<PR = DefaultRemotingRequestProcessor> { /* private fields */ }Expand description
High-performance async RocketMQ client with connection pooling and auto-reconnection.
§Architecture
┌─────────────────────────────────────────────────────────┐
│ RocketmqDefaultClient<PR> │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────┐ ┌──────────────────┐ │
│ │ Connection Pool│ ───► │NameServer Router │ │
│ │ (DashMap) │ │ (Health-based) │ │
│ └────────────────┘ └──────────────────┘ │
│ │ │ │
│ ↓ ↓ │
│ ┌────────────────┐ ┌──────────────────┐ │
│ │ Request Handler│ ───► │ Response Table │ │
│ │ (async tasks) │ │ (oneshot rx) │ │
│ └────────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘§Key Features
- Connection Pooling: Reuses TCP connections to brokers/nameservers
- Auto-Reconnection: Exponential backoff retry on connection failures
- Smart Routing: Selects healthiest nameserver based on latency/errors
- Request Multiplexing: Multiple concurrent requests per connection
- Graceful Shutdown: Drains in-flight requests before closing
§Performance Characteristics
- Lock Contention: Uses
Arc<Mutex<HashMap>>for connection pool- ⚠️ TODO: Migrate to
DashMapfor lock-free reads
- ⚠️ TODO: Migrate to
- Memory: O(N) where N = number of unique broker addresses
- Latency: Single async hop for cached connections, 2-3 hops for new
§Type Parameters
PR- Request processor type (default:DefaultRemotingRequestProcessor)
§Example
ⓘ
use std::sync::Arc;
use rocketmq_remoting::clients::RocketmqDefaultClient;
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
let config = Arc::new(TokioClientConfig::default());
let processor = Default::default();
let client = RocketmqDefaultClient::new(config, processor);
// Update nameserver list
client
.update_name_server_address_list(vec!["127.0.0.1:9876".into()])
.await;
// Send request
let response = client
.invoke_request(
None, // use default nameserver
request, 3000, // 3s timeout
)
.await?;Implementations§
Source§impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR>
impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR>
pub fn new(tokio_client_config: Arc<TokioClientConfig>, processor: PR) -> Self
pub fn new_with_cl( tokio_client_config: Arc<TokioClientConfig>, processor: PR, tx: Option<Sender<ConnectionNetEvent>>, ) -> Self
Source§impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR>
impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR>
Sourcepub fn enable_connection_pool(
&mut self,
max_connections: usize,
max_idle_duration: Duration,
cleanup_interval: Duration,
) -> JoinHandle<()>
pub fn enable_connection_pool( &mut self, max_connections: usize, max_idle_duration: Duration, cleanup_interval: Duration, ) -> JoinHandle<()>
Enable advanced connection pool with metrics and automatic cleanup.
§Arguments
max_connections- Maximum number of connections (0 = unlimited)max_idle_duration- Idle timeout (e.g., 5 minutes)cleanup_interval- Cleanup task interval (e.g., 30 seconds)
§Returns
Task handle for the cleanup background task (can be aborted)
§Example
ⓘ
let client =
RocketmqDefaultClient::new(Arc::new(TokioClientConfig::default()), Default::default());
// Enable connection pool with:
// - Max 1000 connections
// - 5 minute idle timeout
// - 30 second cleanup interval
let cleanup_task =
client.enable_connection_pool(1000, Duration::from_secs(300), Duration::from_secs(30));
// ... use client ...
// Stop cleanup when shutting down
cleanup_task.abort();Sourcepub fn get_pool_stats(&self) -> Option<PoolStats>
pub fn get_pool_stats(&self) -> Option<PoolStats>
Get connection pool statistics (if enabled).
§Returns
Some(stats)- Pool statisticsNone- Connection pool not enabled
§Example
ⓘ
if let Some(stats) = client.get_pool_stats() {
println!(
"Pool: {}/{} connections ({:.1}% util)",
stats.active(),
stats.max_connections,
stats.utilization() * 100.0
);
println!("Error rate: {:.2}%", stats.error_rate() * 100.0);
}Trait Implementations§
Source§impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqDefaultClient<PR>
impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqDefaultClient<PR>
Source§async fn invoke_request(
&self,
addr: Option<&CheetahString>,
request: RemotingCommand,
timeout_millis: u64,
) -> RocketMQResult<RemotingCommand>
async fn invoke_request( &self, addr: Option<&CheetahString>, request: RemotingCommand, timeout_millis: u64, ) -> RocketMQResult<RemotingCommand>
Send request and wait for response with timeout.
§HOT PATH
This is the primary client API - optimize for low latency and high throughput.
§Flow
1. Get/create client connection (~100ns fast path, ~50ms slow)
2. Spawn send task on runtime (~10μs)
3. Apply timeout wrapper (~100ns)
4. Wait for response via oneshot (network RTT + processing)
5. Unwrap nested Result layers (~10ns)§Performance Optimizations
- Early validation: Check client availability before expensive spawn
- Flat error handling: Reduce nested
matchoverhead - Direct await: Spawn only for timeout enforcement (could be optimized further)
§Error Handling
Returns RocketmqError::RemoteError for all failures:
- Client unavailable (no connection)
- Network I/O error (send/recv failure)
- Timeout (no response within deadline)
- Task spawn error (runtime shutdown)
§Arguments
addr- Target address (None = use nameserver)request- Command to sendtimeout_millis- Max wait time for response
§Examples
ⓘ
let request = RemotingCommand::create_request_command(/* ... */);
let response = client.invoke_request(
Some(&"127.0.0.1:10911".into()),
request,
3000 // 3 second timeout
).await?;Source§async fn update_name_server_address_list(&self, addrs: Vec<CheetahString>)
async fn update_name_server_address_list(&self, addrs: Vec<CheetahString>)
Updates the list of name remoting_server addresses. Read more
Source§fn get_name_server_address_list(&self) -> &[CheetahString]
fn get_name_server_address_list(&self) -> &[CheetahString]
Retrieves the current list of name remoting_server addresses. Read more
Source§fn get_available_name_srv_list(&self) -> Vec<CheetahString>
fn get_available_name_srv_list(&self) -> Vec<CheetahString>
Retrieves a list of available name remoting_server addresses. Read more
Source§async fn invoke_request_oneway(
&self,
addr: &CheetahString,
request: RemotingCommand,
timeout_millis: u64,
)
async fn invoke_request_oneway( &self, addr: &CheetahString, request: RemotingCommand, timeout_millis: u64, )
Invokes a command on a specified address without waiting for a response. Read more
Source§fn is_address_reachable(&mut self, addr: &CheetahString)
fn is_address_reachable(&mut self, addr: &CheetahString)
Checks if a specified address is reachable. Read more
Source§fn close_clients(&mut self, addrs: Vec<String>)
fn close_clients(&mut self, addrs: Vec<String>)
Closes clients connected to the specified addresses. Read more
fn register_processor(&mut self, processor: impl RequestProcessor + Sync)
Source§impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingService for RocketmqDefaultClient<PR>
impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingService for RocketmqDefaultClient<PR>
Source§async fn start(&self, this: WeakArcMut<Self>)
async fn start(&self, this: WeakArcMut<Self>)
Asynchronously starts the remoting service. Read more
Source§fn clear_rpc_hook(&mut self)
fn clear_rpc_hook(&mut self)
Clears all registered RPC hooks. Read more
Auto Trait Implementations§
impl<PR = DefaultRemotingRequestProcessor> !Freeze for RocketmqDefaultClient<PR>
impl<PR = DefaultRemotingRequestProcessor> !RefUnwindSafe for RocketmqDefaultClient<PR>
impl<PR> Send for RocketmqDefaultClient<PR>
impl<PR> Sync for RocketmqDefaultClient<PR>
impl<PR> Unpin for RocketmqDefaultClient<PR>
impl<PR = DefaultRemotingRequestProcessor> !UnwindSafe for RocketmqDefaultClient<PR>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
Causes
self to use its Binary implementation when Debug-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
Causes
self to use its Display implementation when
Debug-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
Causes
self to use its LowerExp implementation when
Debug-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
Causes
self to use its LowerHex implementation when
Debug-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
Causes
self to use its Octal implementation when Debug-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
Causes
self to use its Pointer implementation when
Debug-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
Causes
self to use its UpperExp implementation when
Debug-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
Causes
self to use its UpperHex implementation when
Debug-formatted.Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Pipes by value. This is generally the method you want to use. Read more
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
Borrows
self and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
Mutably borrows
self and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
Borrows
self, then passes self.as_ref() into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
Mutably borrows
self, then passes self.as_mut() into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
Borrows
self, then passes self.deref() into the pipe function.Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Immutable access to the
Borrow<B> of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
Mutable access to the
BorrowMut<B> of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
Immutable access to the
AsRef<R> view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
Mutable access to the
AsMut<R> view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Immutable access to the
Deref::Target of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Mutable access to the
Deref::Target of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
Calls
.tap() only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
Calls
.tap_mut() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
Calls
.tap_borrow() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
Calls
.tap_borrow_mut() only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
Calls
.tap_ref() only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
Calls
.tap_ref_mut() only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
Calls
.tap_deref() only in debug builds, and is erased in release
builds.