rocketmq_remoting/clients/rocketmq_tokio_client.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::sync::atomic::AtomicI32;
20use std::sync::Arc;
21use std::time::Duration;
22
23use cheetah_string::CheetahString;
24use dashmap::DashMap;
25use rand::Rng;
26use rocketmq_runtime::RocketMQRuntime;
27use rocketmq_rust::ArcMut;
28use rocketmq_rust::WeakArcMut;
29use tokio::time;
30use tracing::debug;
31use tracing::error;
32use tracing::info;
33use tracing::warn;
34
35use crate::base::connection_net_event::ConnectionNetEvent;
36use crate::clients::connection_pool::ConnectionPool;
37use crate::clients::nameserver_selector::LatencyTracker;
38use crate::clients::reconnect::CircuitBreaker;
39use crate::clients::Client;
40use crate::clients::RemotingClient;
41use crate::protocol::remoting_command::RemotingCommand;
42use crate::remoting::inner::RemotingGeneralHandler;
43use crate::remoting::RemotingService;
44use crate::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
45use crate::runtime::config::client_config::TokioClientConfig;
46use crate::runtime::processor::RequestProcessor;
47use crate::runtime::RPCHook;
48
49const LOCK_TIMEOUT_MILLIS: u64 = 3000;
50
51/// High-performance async RocketMQ client with connection pooling and auto-reconnection.
52///
53/// # Architecture
54///
55/// ```text
56/// ┌─────────────────────────────────────────────────────────┐
57/// │ RocketmqDefaultClient<PR> │
58/// ├─────────────────────────────────────────────────────────┤
59/// │ │
60/// │ ┌────────────────┐ ┌──────────────────┐ │
61/// │ │ Connection Pool│ ───► │NameServer Router │ │
62/// │ │ (DashMap) │ │ (Health-based) │ │
63/// │ └────────────────┘ └──────────────────┘ │
64/// │ │ │ │
65/// │ ↓ ↓ │
66/// │ ┌────────────────┐ ┌──────────────────┐ │
67/// │ │ Request Handler│ ───► │ Response Table │ │
68/// │ │ (async tasks) │ │ (oneshot rx) │ │
69/// │ └────────────────┘ └──────────────────┘ │
70/// │ │
71/// └─────────────────────────────────────────────────────────┘
72/// ```
73///
74/// # Key Features
75///
76/// - **Connection Pooling**: Reuses TCP connections to brokers/nameservers
77/// - **Auto-Reconnection**: Exponential backoff retry on connection failures
78/// - **Smart Routing**: Selects healthiest nameserver based on latency/errors
79/// - **Request Multiplexing**: Multiple concurrent requests per connection
80/// - **Graceful Shutdown**: Drains in-flight requests before closing
81///
82/// # Performance Characteristics
83///
84/// - **Lock Contention**: Uses `Arc<Mutex<HashMap>>` for connection pool
85/// - ⚠️ TODO: Migrate to `DashMap` for lock-free reads
86/// - **Memory**: O(N) where N = number of unique broker addresses
87/// - **Latency**: Single async hop for cached connections, 2-3 hops for new
88///
89/// # Type Parameters
90///
91/// * `PR` - Request processor type (default: `DefaultRemotingRequestProcessor`)
92///
93/// # Example
94///
95/// ```rust,ignore
96/// use std::sync::Arc;
97///
98/// use rocketmq_remoting::clients::RocketmqDefaultClient;
99/// use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
100///
101/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
102/// let config = Arc::new(TokioClientConfig::default());
103/// let processor = Default::default();
104/// let client = RocketmqDefaultClient::new(config, processor);
105///
106/// // Update nameserver list
107/// client
108/// .update_name_server_address_list(vec!["127.0.0.1:9876".into()])
109/// .await;
110///
111/// // Send request
112/// let response = client
113/// .invoke_request(
114/// None, // use default nameserver
115/// request, 3000, // 3s timeout
116/// )
117/// .await?;
118/// # Ok(())
119/// # }
120/// ```
121pub struct RocketmqDefaultClient<PR = DefaultRemotingRequestProcessor> {
122 /// Client configuration (timeouts, buffer sizes, etc.)
123 ///
124 /// Shared across all connections to avoid duplication
125 tokio_client_config: Arc<TokioClientConfig>,
126
127 /// Connection pool: `addr -> Client` mapping
128 ///
129 /// **Lock-Free Design**: Uses `DashMap` for concurrent access without Mutex
130 /// - Read operations (get): Zero-lock overhead
131 /// - Write operations (insert/remove): Fine-grained per-shard locking
132 /// - Concurrency: Scales linearly with CPU cores (typically 16-64 shards)
133 ///
134 /// Invariant: Only contains healthy connections (unhealthy removed on error)
135 connection_tables: Arc<DashMap<CheetahString /* ip:port */, Client<PR>>>,
136
137 /// List of all nameserver addresses (in priority order)
138 ///
139 /// Updated via `update_name_server_address_list()`
140 namesrv_addr_list: ArcMut<Vec<CheetahString>>,
141
142 /// Currently selected nameserver (cached for fast path)
143 ///
144 /// May be `None` if no nameserver available or all unhealthy
145 namesrv_addr_choosed: ArcMut<Option<CheetahString>>,
146
147 /// Set of healthy/reachable nameservers
148 ///
149 /// Updated asynchronously by health check task (`scan_available_name_srv`)
150 available_namesrv_addr_set: ArcMut<HashSet<CheetahString>>,
151
152 /// Round-robin index for nameserver selection
153 ///
154 /// ⚠️ Deprecated: Use `latency_tracker` for smart selection
155 namesrv_index: Arc<AtomicI32>,
156
157 /// Latency tracker for smart nameserver selection
158 ///
159 /// Tracks P99 latency and error rates to select the best nameserver
160 latency_tracker: LatencyTracker,
161
162 /// Circuit breakers per address to prevent cascading failures
163 ///
164 /// Maps address to circuit breaker state for auto-reconnection
165 circuit_breakers: Arc<DashMap<CheetahString, CircuitBreaker>>,
166
167 /// Advanced connection pool with metrics and lifecycle management
168 ///
169 /// **Optional Feature**: Provides enhanced connection tracking:
170 /// - Idle timeout and automatic cleanup
171 /// - Per-connection metrics (latency, error rate)
172 /// - Pool-level statistics (utilization, health)
173 ///
174 /// **Usage**: Call `enable_connection_pool()` to activate
175 connection_pool: Option<ConnectionPool<PR>>,
176
177 /// Background task runtime for health checks and cleanup
178 ///
179 /// `None` after `shutdown()` is called
180 client_runtime: Option<RocketMQRuntime>,
181
182 /// Shared command handler (processor + response table)
183 ///
184 /// Arc-wrapped to share across all `Client` instances
185 cmd_handler: ArcMut<RemotingGeneralHandler<PR>>,
186
187 /// Optional connection event broadcaster
188 ///
189 /// Used for monitoring and metrics collection
190 tx: Option<tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
191}
192impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR> {
193 pub fn new(tokio_client_config: Arc<TokioClientConfig>, processor: PR) -> Self {
194 Self::new_with_cl(tokio_client_config, processor, None)
195 }
196
197 pub fn new_with_cl(
198 tokio_client_config: Arc<TokioClientConfig>,
199 processor: PR,
200 tx: Option<tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
201 ) -> Self {
202 let handler = RemotingGeneralHandler {
203 request_processor: processor,
204 //shutdown: (),
205 rpc_hooks: vec![],
206 response_table: ArcMut::new(HashMap::with_capacity(512)),
207 };
208 Self {
209 tokio_client_config,
210 connection_tables: Arc::new(DashMap::with_capacity(64)),
211 namesrv_addr_list: ArcMut::new(Default::default()),
212 namesrv_addr_choosed: ArcMut::new(Default::default()),
213 available_namesrv_addr_set: ArcMut::new(Default::default()),
214 namesrv_index: Arc::new(AtomicI32::new(init_value_index())),
215 latency_tracker: LatencyTracker::new(),
216 circuit_breakers: Arc::new(DashMap::with_capacity(64)),
217 connection_pool: None, // Disabled by default, enable via enable_connection_pool()
218 client_runtime: Some(RocketMQRuntime::new_multi(10, "client-thread")),
219 cmd_handler: ArcMut::new(handler),
220 tx,
221 }
222 }
223}
224
225impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR> {
226 /// Enable advanced connection pool with metrics and automatic cleanup.
227 ///
228 /// # Arguments
229 ///
230 /// * `max_connections` - Maximum number of connections (0 = unlimited)
231 /// * `max_idle_duration` - Idle timeout (e.g., 5 minutes)
232 /// * `cleanup_interval` - Cleanup task interval (e.g., 30 seconds)
233 ///
234 /// # Returns
235 ///
236 /// Task handle for the cleanup background task (can be aborted)
237 ///
238 /// # Example
239 ///
240 /// ```rust,ignore
241 /// # use rocketmq_remoting::clients::RocketmqDefaultClient;
242 /// # use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
243 /// # use std::sync::Arc;
244 /// # use std::time::Duration;
245 /// # async fn example() {
246 /// let client =
247 /// RocketmqDefaultClient::new(Arc::new(TokioClientConfig::default()), Default::default());
248 ///
249 /// // Enable connection pool with:
250 /// // - Max 1000 connections
251 /// // - 5 minute idle timeout
252 /// // - 30 second cleanup interval
253 /// let cleanup_task =
254 /// client.enable_connection_pool(1000, Duration::from_secs(300), Duration::from_secs(30));
255 ///
256 /// // ... use client ...
257 ///
258 /// // Stop cleanup when shutting down
259 /// cleanup_task.abort();
260 /// # }
261 /// ```
262 pub fn enable_connection_pool(
263 &mut self,
264 max_connections: usize,
265 max_idle_duration: Duration,
266 cleanup_interval: Duration,
267 ) -> tokio::task::JoinHandle<()> {
268 let pool = ConnectionPool::new(max_connections, max_idle_duration);
269 let cleanup_task = pool.start_cleanup_task(cleanup_interval);
270 self.connection_pool = Some(pool);
271 info!(
272 "Connection pool enabled: max={}, idle_timeout={:?}, cleanup_interval={:?}",
273 max_connections, max_idle_duration, cleanup_interval
274 );
275 cleanup_task
276 }
277
278 /// Get connection pool statistics (if enabled).
279 ///
280 /// # Returns
281 ///
282 /// * `Some(stats)` - Pool statistics
283 /// * `None` - Connection pool not enabled
284 ///
285 /// # Example
286 ///
287 /// ```rust,ignore
288 /// # use rocketmq_remoting::clients::RocketmqDefaultClient;
289 /// # fn example(client: &RocketmqDefaultClient) {
290 /// if let Some(stats) = client.get_pool_stats() {
291 /// println!(
292 /// "Pool: {}/{} connections ({:.1}% util)",
293 /// stats.active(),
294 /// stats.max_connections,
295 /// stats.utilization() * 100.0
296 /// );
297 /// println!("Error rate: {:.2}%", stats.error_rate() * 100.0);
298 /// }
299 /// # }
300 /// ```
301 pub fn get_pool_stats(&self) -> Option<crate::clients::connection_pool::PoolStats> {
302 self.connection_pool.as_ref().map(|pool| pool.stats())
303 }
304
305 /// Get or create connection to a healthy nameserver using smart latency-based selection.
306 ///
307 /// # Selection Strategy
308 ///
309 /// **Latency-based** (NEW): Selects lowest P99 latency nameserver
310 /// ```text
311 /// namesrv_list = [ns1, ns2, ns3]
312 /// Metrics:
313 /// ns1: P99=5ms, errors=0
314 /// ns2: P99=50ms, errors=0
315 /// ns3: P99=10ms, errors=3 (unhealthy)
316 ///
317 /// Selection: ns1 (lowest latency + healthy)
318 /// ```
319 ///
320 /// **Scoring Formula**:
321 /// ```text
322 /// score = P99_latency_ms + (consecutive_errors × 100)
323 /// ```
324 ///
325 /// **Fallback**: If no metrics available, uses first nameserver
326 ///
327 /// # Performance Notes
328 ///
329 /// - **Lock Minimization**: Drops lock before expensive `create_client()`
330 /// - **Smart Selection**: O(N) where N = nameserver count (typically <10)
331 /// - **Caching**: Reuses `namesrv_addr_choosed` for fast path
332 ///
333 /// # Returns
334 ///
335 /// * `Some(client)` - Connected to healthy nameserver
336 /// * `None` - No nameservers available or all unhealthy
337 async fn get_and_create_nameserver_client(&self) -> Option<Client<PR>> {
338 // Try cached nameserver ===
339 let cached_addr = self.namesrv_addr_choosed.as_ref().clone();
340
341 if let Some(ref addr) = cached_addr {
342 // Quick lookup in connection pool (lock-free with DashMap)
343 if let Some(client) = self.connection_tables.get(addr) {
344 if client.connection().is_healthy() && self.latency_tracker.is_healthy(addr) {
345 // Fast path: Cached nameserver is healthy
346 return Some(client.value().clone());
347 }
348 debug!("Cached nameserver {} is unhealthy, selecting new one", addr);
349 }
350 }
351
352 // Smart nameserver selection ===
353 let addr_list = self.namesrv_addr_list.as_ref();
354
355 if addr_list.is_empty() {
356 warn!("No nameservers configured in namesrv_addr_list");
357 return None;
358 }
359
360 // Use latency tracker to select best nameserver
361 let selected_addr = self.latency_tracker.select_best(addr_list)?;
362
363 info!(
364 "Selected nameserver: {} (P99: {:?}, errors: {})",
365 selected_addr,
366 self.latency_tracker
367 .get_p99(selected_addr)
368 .unwrap_or(Duration::from_secs(0)),
369 self.latency_tracker.get_error_count(selected_addr)
370 );
371
372 // Update cached selection
373 self.namesrv_addr_choosed
374 .mut_from_ref()
375 .replace(selected_addr.clone());
376
377 //Create connection to selected nameserver ===
378 self.create_client(
379 selected_addr,
380 Duration::from_millis(self.tokio_client_config.connect_timeout_millis as u64),
381 )
382 .await
383 }
384
385 /// Get existing healthy client or create new connection.
386 ///
387 /// # Flow
388 /// 1. If `addr` is `None` or empty, route to nameserver
389 /// 2. Check connection pool for existing client
390 /// 3. Verify client health (connection.is_healthy() == true)
391 /// 4. If unhealthy or missing, create new connection
392 ///
393 /// # Performance
394 /// - **Fast path**: Single lock acquire + HashMap lookup + health check (< 100ns)
395 /// - **Slow path**: Lock + TCP handshake + TLS (if enabled) (10-50ms)
396 ///
397 /// # Lock Optimization
398 /// Current: Hold lock during `get()` and `clone()`
399 /// TODO: Use `DashMap` for lock-free read path:
400 /// ```rust,ignore
401 /// if let Some(client) = self.connection_tables.get(addr) {
402 /// if client.connection().is_healthy() {
403 /// return Some(client.clone());
404 /// }
405 /// }
406 /// ```
407 async fn get_and_create_client(&self, addr: Option<&CheetahString>) -> Option<Client<PR>> {
408 // HOT PATH: Most requests hit this method
409
410 // Route empty addresses to nameserver
411 let target_addr = match addr {
412 None => return self.get_and_create_nameserver_client().await,
413 Some(addr) if addr.is_empty() => return self.get_and_create_nameserver_client().await,
414 Some(addr) => addr,
415 };
416
417 // Fast path: Check connection pool (lock-free with DashMap)
418 if let Some(client_ref) = self.connection_tables.get(target_addr) {
419 let client = client_ref.value().clone();
420 if client.connection().is_healthy() {
421 return Some(client); // Return healthy cached client
422 }
423 // Client unhealthy - will create new connection
424 debug!(
425 "Cached client for {} is unhealthy, reconnecting...",
426 target_addr
427 );
428 }
429
430 // Slow path: Create new connection
431 self.create_client(
432 target_addr,
433 Duration::from_millis(self.tokio_client_config.connect_timeout_millis as u64),
434 )
435 .await
436 }
437
438 /// Create new client connection with double-checked locking pattern.
439 ///
440 /// # Concurrency Strategy
441 ///
442 /// Uses **double-checked locking** to prevent thundering herd:
443 /// 1. **Check 1**: Quick lookup before TCP connect (avoids redundant connects)
444 /// 2. **Release lock**: Perform TCP connect WITHOUT holding lock
445 /// 3. **Check 2**: Re-acquire lock and verify no other task created connection
446 /// 4. **Insert**: Store new client in pool
447 ///
448 /// # Performance
449 ///
450 /// **Before (holding lock during connect)**:
451 /// ```text
452 /// Thread 1: [====== LOCK ======][==== CONNECT (50ms) ====][==== INSERT ====]
453 /// Thread 2: [waiting.....................][LOCK]
454 /// Thread 3: [waiting.....................][LOCK]
455 /// Total: ~50ms * 3 = 150ms wasted
456 /// ```
457 ///
458 /// **After (lock-free connect)**:
459 /// ```text
460 /// Thread 1: [== LOCK ==][RELEASE]→[CONNECT 50ms]→[LOCK][INSERT]
461 /// Thread 2: [== LOCK ==][RELEASE]→[CONNECT 50ms]→[LOCK][cached!]
462 /// Thread 3: [== LOCK ==][RELEASE]→[CONNECT 50ms]→[LOCK][cached!]
463 /// Total: ~50ms + small lock overhead
464 /// ```
465 ///
466 /// # Arguments
467 ///
468 /// * `addr` - Target address (e.g., "127.0.0.1:10911")
469 /// * `duration` - Connection timeout
470 ///
471 /// # Returns
472 ///
473 /// * `Some(client)` - Successfully connected (either new or cached)
474 /// * `None` - Connection failed or timed out (or circuit breaker OPEN)
475 async fn create_client(&self, addr: &CheetahString, duration: Duration) -> Option<Client<PR>> {
476 // Try connection pool first (if enabled) ===
477 if let Some(ref pool) = self.connection_pool {
478 if let Some(pooled_conn) = pool.get(addr) {
479 if pooled_conn.is_healthy() {
480 debug!("Reusing pooled connection to {}", addr);
481 return Some(pooled_conn.client().clone());
482 }
483 // Unhealthy connection - remove from pool
484 pool.remove(addr);
485 }
486 }
487
488 // Double-check before expensive connect (lock-free with DashMap) ===
489
490 // Check if healthy client already exists (fallback to DashMap)
491 if let Some(client_ref) = self.connection_tables.get(addr) {
492 let client = client_ref.value().clone();
493 if client.connection().is_healthy() {
494 return Some(client);
495 }
496 // Client unhealthy - remove it immediately (DashMap allows concurrent removal)
497 drop(client_ref); // Release read guard before removal
498 self.connection_tables.remove(addr);
499 }
500
501 // check circuit breaker ===
502 // Get or create circuit breaker for this address
503 let mut breaker = self
504 .circuit_breakers
505 .entry(addr.clone())
506 .or_insert_with(CircuitBreaker::default_breaker)
507 .clone();
508
509 // Check if request allowed (CLOSED or HALF_OPEN)
510 if !breaker.allow_request() {
511 warn!(
512 "Circuit breaker OPEN for {}, rejecting connection attempt",
513 addr
514 );
515 return None;
516 }
517
518 // Perform TCP connect WITHOUT holding lock ===
519 let addr_inner = addr.to_string();
520
521 let connect_result = time::timeout(duration, async {
522 Client::connect(addr_inner, self.cmd_handler.clone(), self.tx.as_ref()).await
523 })
524 .await;
525
526 // Handle connection result and update circuit breaker ===
527 match connect_result {
528 Ok(Ok(new_client)) => {
529 // Connection successful - record success in circuit breaker
530 breaker.record_success();
531 self.circuit_breakers.insert(addr.clone(), breaker);
532
533 // Insert into connection pool (if enabled) ===
534 if let Some(ref pool) = self.connection_pool {
535 if pool.insert(addr.clone(), new_client.clone()) {
536 info!(
537 "Added connection to pool: {} (pool size: {})",
538 addr,
539 pool.stats().total
540 );
541 } else {
542 warn!("Connection pool at capacity, falling back to DashMap");
543 }
544 }
545
546 // Insert into DashMap (fallback or dual-store) ===
547 match self.connection_tables.entry(addr.clone()) {
548 dashmap::mapref::entry::Entry::Occupied(mut entry) => {
549 // Check if existing is still healthy
550 if entry.get().connection().is_healthy() {
551 info!("Race condition: {} already connected by another task", addr);
552 return Some(entry.get().clone());
553 }
554 // Replace unhealthy with new client
555 entry.insert(new_client.clone());
556 }
557 dashmap::mapref::entry::Entry::Vacant(entry) => {
558 entry.insert(new_client.clone());
559 }
560 }
561
562 info!("Successfully created client for {}", addr);
563 Some(new_client)
564 }
565 Ok(Err(e)) => {
566 // Connection failed - record failure in circuit breaker
567 error!("Failed to connect to {}: {:?}", addr, e);
568 breaker.record_failure();
569 self.circuit_breakers.insert(addr.clone(), breaker);
570 None
571 }
572 Err(_) => {
573 // Timeout - record failure in circuit breaker
574 error!("Connection to {} timed out after {:?}", addr, duration);
575 breaker.record_failure();
576 self.circuit_breakers.insert(addr.clone(), breaker);
577 None
578 }
579 }
580 }
581
582 /// Creates a client with automatic retry using exponential backoff.
583 ///
584 /// # Arguments
585 ///
586 /// * `addr` - Target address
587 /// * `duration` - Connection timeout per attempt
588 /// * `max_attempts` - Maximum retry attempts (0 = use circuit breaker only)
589 ///
590 /// # Returns
591 ///
592 /// * `Some(client)` - Successfully connected (possibly after retries)
593 /// * `None` - All attempts failed or circuit breaker blocked
594 ///
595 /// # Example
596 ///
597 /// ```ignore
598 /// // Retry up to 3 times with exponential backoff (1s, 2s, 4s)
599 /// let client = self.create_client_with_retry(addr, Duration::from_secs(5), 3).await;
600 /// ```
601 async fn create_client_with_retry(
602 &self,
603 addr: &CheetahString,
604 duration: Duration,
605 max_attempts: u32,
606 ) -> Option<Client<PR>> {
607 use crate::clients::reconnect::ExponentialBackoff;
608
609 let mut backoff = ExponentialBackoff::new(
610 Duration::from_secs(1), // Initial delay
611 Duration::from_secs(10), // Max delay
612 max_attempts,
613 );
614
615 loop {
616 // Try to create client
617 if let Some(client) = self.create_client(addr, duration).await {
618 return Some(client);
619 }
620
621 // Check if should retry
622 if let Some(delay) = backoff.next_delay() {
623 debug!(
624 "Connection to {} failed, retrying in {:?} (attempt {}/{})",
625 addr,
626 delay,
627 backoff.current_attempt(),
628 max_attempts
629 );
630 time::sleep(delay).await;
631 } else {
632 warn!(
633 "Connection to {} failed after {} attempts",
634 addr,
635 backoff.current_attempt()
636 );
637 return None;
638 }
639 }
640 }
641
642 /// Background task: Continuously scan nameservers to update availability set.
643 ///
644 /// # Purpose
645 ///
646 /// Maintains `available_namesrv_addr_set` by probing all configured nameservers
647 /// and marking them as available/unavailable based on connection health.
648 ///
649 /// # Algorithm
650 ///
651 /// ```text
652 /// 1. Cleanup phase: Remove stale entries not in namesrv_addr_list
653 /// 2. Probe phase: Test connection to each nameserver
654 /// 3. Update phase: Add/remove from available_namesrv_addr_set
655 /// ```
656 ///
657 /// # Performance
658 ///
659 /// - **Frequency**: Called every `connect_timeout_millis` (typically 3s)
660 /// - **Concurrency**: Sequential probes (TODO: parallelize with `join_all`)
661 /// - **Overhead**: O(N) where N = number of nameservers (typically < 10)
662 ///
663 /// # Optimization Opportunities
664 ///
665 /// 1. **Parallel probes**: Use `futures::future::join_all` to probe all nameservers
666 /// concurrently
667 /// 2. **Smart backoff**: Skip probing recently-successful servers
668 /// 3. **Metrics**: Track probe latency for latency-based selection
669 ///
670 /// # Example Timeline
671 ///
672 /// ```text
673 /// T+0s: Start scan
674 /// T+0s: Cleanup: Remove ["old-ns:9876"]
675 /// T+0s: Probe ns1 → Success (mark available)
676 /// T+50ms: Probe ns2 → Timeout (mark unavailable)
677 /// T+100ms: Probe ns3 → Success (mark available)
678 /// T+100ms: Scan complete
679 /// T+3s: Next scan begins...
680 /// ```
681 async fn scan_available_name_srv(&self) {
682 let addr_list = self.namesrv_addr_list.as_ref();
683
684 if addr_list.is_empty() {
685 debug!("No nameservers configured, skipping availability scan");
686 return;
687 }
688
689 // Cleanup - Remove stale entries ===
690 // Collect addresses to remove (avoid holding borrow during mutation)
691 let stale_addrs: Vec<CheetahString> = self
692 .available_namesrv_addr_set
693 .as_ref()
694 .iter()
695 .filter(|addr| !addr_list.contains(addr))
696 .cloned()
697 .collect();
698
699 for stale_addr in stale_addrs {
700 warn!(
701 "Removing stale nameserver from available set: {}",
702 stale_addr
703 );
704 self.available_namesrv_addr_set
705 .mut_from_ref()
706 .remove(&stale_addr);
707 }
708
709 // Parallel probe all configured nameservers ===
710 // Parallel probing reduces scan time from O(N * 50ms) to O(max(50ms))
711 use futures::future::join_all;
712
713 let probe_futures: Vec<_> = addr_list
714 .iter()
715 .map(|addr| {
716 let addr_clone = addr.clone();
717 async move {
718 let result = self.get_and_create_client(Some(&addr_clone)).await;
719 (addr_clone, result.is_some())
720 }
721 })
722 .collect();
723
724 // Execute all probes concurrently
725 let results = join_all(probe_futures).await;
726
727 // Update availability set based on probe results
728 for (namesrv_addr, is_available) in results {
729 if is_available {
730 // Connection successful - mark as available
731 if self
732 .available_namesrv_addr_set
733 .mut_from_ref()
734 .insert(namesrv_addr.clone())
735 {
736 info!("Nameserver {} is now available", namesrv_addr);
737 }
738 } else {
739 // Connection failed - mark as unavailable
740 if self
741 .available_namesrv_addr_set
742 .mut_from_ref()
743 .remove(&namesrv_addr)
744 {
745 warn!("Nameserver {} is now unavailable", namesrv_addr);
746 }
747 }
748 }
749
750 debug!(
751 "Availability scan complete: {}/{} nameservers available",
752 self.available_namesrv_addr_set.as_ref().len(),
753 addr_list.len()
754 );
755 }
756}
757
758#[allow(unused_variables)]
759impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingService for RocketmqDefaultClient<PR> {
760 async fn start(&self, this: WeakArcMut<Self>) {
761 if let Some(client) = this.upgrade() {
762 let connect_timeout_millis = self.tokio_client_config.connect_timeout_millis as u64;
763 self.client_runtime
764 .as_ref()
765 .unwrap()
766 .get_handle()
767 .spawn(async move {
768 loop {
769 client.scan_available_name_srv().await;
770 time::sleep(Duration::from_millis(connect_timeout_millis)).await;
771 }
772 });
773 }
774 }
775
776 fn shutdown(&mut self) {
777 if let Some(rt) = self.client_runtime.take() {
778 rt.shutdown();
779 }
780 // DashMap::clear() is already thread-safe, no need for async lock
781 self.connection_tables.clear();
782 self.namesrv_addr_list.clear();
783 self.available_namesrv_addr_set.clear();
784
785 info!(">>>>>>>>>>>>>>>RemotingClient shutdown success<<<<<<<<<<<<<<<<<");
786 }
787
788 fn register_rpc_hook(&mut self, hook: Arc<dyn RPCHook>) {
789 self.cmd_handler.register_rpc_hook(hook);
790 }
791
792 fn clear_rpc_hook(&mut self) {
793 todo!()
794 }
795}
796
797#[allow(unused_variables)]
798impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqDefaultClient<PR> {
799 async fn update_name_server_address_list(&self, addrs: Vec<CheetahString>) {
800 let old = self.namesrv_addr_list.mut_from_ref();
801 let mut update = false;
802
803 if !addrs.is_empty() {
804 if old.is_empty() || addrs.len() != old.len() {
805 update = true;
806 } else {
807 for addr in &addrs {
808 if !old.contains(addr) {
809 update = true;
810 break;
811 }
812 }
813 }
814
815 if update {
816 // Shuffle the addresses
817 // Shuffle logic is not implemented here as it is not available in standard library
818 // You can implement it using various algorithms like Fisher-Yates shuffle
819
820 info!(
821 "name remoting_server address updated. NEW : {:?} , OLD: {:?}",
822 addrs, old
823 );
824 /* let mut rng = thread_rng();
825 addrs.shuffle(&mut rng);*/
826 self.namesrv_addr_list.mut_from_ref().extend(addrs.clone());
827
828 // should close the channel if choosed addr is not exist.
829 if let Some(namesrv_addr) = self.namesrv_addr_choosed.as_ref() {
830 if !addrs.contains(namesrv_addr) {
831 // DashMap allows direct removal without collecting
832 self.connection_tables.remove(namesrv_addr);
833 }
834 }
835 }
836 }
837 }
838
839 fn get_name_server_address_list(&self) -> &[CheetahString] {
840 self.namesrv_addr_list.as_ref()
841 }
842
843 fn get_available_name_srv_list(&self) -> Vec<CheetahString> {
844 self.available_namesrv_addr_set
845 .as_ref()
846 .clone()
847 .into_iter()
848 .collect()
849 }
850
851 /// Send request and wait for response with timeout.
852 ///
853 /// # HOT PATH
854 /// This is the primary client API - optimize for low latency and high throughput.
855 ///
856 /// # Flow
857 /// ```text
858 /// 1. Get/create client connection (~100ns fast path, ~50ms slow)
859 /// 2. Spawn send task on runtime (~10μs)
860 /// 3. Apply timeout wrapper (~100ns)
861 /// 4. Wait for response via oneshot (network RTT + processing)
862 /// 5. Unwrap nested Result layers (~10ns)
863 /// ```
864 ///
865 /// # Performance Optimizations
866 ///
867 /// - **Early validation**: Check client availability before expensive spawn
868 /// - **Flat error handling**: Reduce nested `match` overhead
869 /// - **Direct await**: Spawn only for timeout enforcement (could be optimized further)
870 ///
871 /// # Error Handling
872 ///
873 /// Returns `RocketmqError::RemoteError` for all failures:
874 /// - Client unavailable (no connection)
875 /// - Network I/O error (send/recv failure)
876 /// - Timeout (no response within deadline)
877 /// - Task spawn error (runtime shutdown)
878 ///
879 /// # Arguments
880 ///
881 /// * `addr` - Target address (None = use nameserver)
882 /// * `request` - Command to send
883 /// * `timeout_millis` - Max wait time for response
884 ///
885 /// # Examples
886 ///
887 /// ```rust,ignore
888 /// # use rocketmq_remoting::clients::RocketmqDefaultClient;
889 /// # use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
890 /// # async fn example(client: &RocketmqDefaultClient) -> Result<(), Box<dyn std::error::Error>> {
891 /// let request = RemotingCommand::create_request_command(/* ... */);
892 /// let response = client.invoke_request(
893 /// Some(&"127.0.0.1:10911".into()),
894 /// request,
895 /// 3000 // 3 second timeout
896 /// ).await?;
897 /// # Ok(())
898 /// # }
899 /// ```
900 async fn invoke_request(
901 &self,
902 addr: Option<&CheetahString>,
903 request: RemotingCommand,
904 timeout_millis: u64,
905 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
906 // Record start time for latency tracking
907 let start = time::Instant::now();
908
909 // Determine target address (for metrics recording)
910 let target_addr = addr
911 .cloned()
912 .or_else(|| self.namesrv_addr_choosed.as_ref().clone());
913
914 // === Get client connection ===
915 let mut client = self.get_and_create_client(addr).await.ok_or_else(|| {
916 let target = addr.map(|a| a.as_str()).unwrap_or("<nameserver>");
917 error!("Failed to get client for {}", target);
918
919 // Record connection error
920 if let Some(ref addr) = target_addr {
921 self.latency_tracker.record_error(addr);
922 }
923
924 rocketmq_error::RocketMQError::network_connection_failed(
925 target.to_string(),
926 "Failed to connect",
927 )
928 })?;
929
930 // === Send request with timeout ===
931 let runtime = self.client_runtime.as_ref().ok_or_else(|| {
932 error!("Client runtime has been shut down");
933 rocketmq_error::RocketMQError::ClientNotStarted
934 })?;
935
936 let send_task = runtime.get_handle().spawn(async move {
937 // Apply timeout at the send_read level
938 match time::timeout(
939 Duration::from_millis(timeout_millis),
940 client.send_read(request, timeout_millis),
941 )
942 .await
943 {
944 Ok(result) => result,
945 Err(_) => Err(rocketmq_error::RocketMQError::Timeout {
946 operation: "send_request",
947 timeout_ms: timeout_millis,
948 }),
949 }
950 });
951
952 // === Await result and record metrics ===
953 match send_task.await {
954 Ok(send_result) => {
955 let latency = start.elapsed();
956
957 match send_result {
958 Ok(response) => {
959 // Record successful request latency
960 if let Some(ref addr) = target_addr {
961 let latency_ms = latency.as_millis() as u64;
962 self.latency_tracker.record_success(addr, latency);
963
964 // Record in connection pool metrics (if enabled)
965 if let Some(ref pool) = self.connection_pool {
966 pool.record_success(addr, latency_ms);
967 }
968
969 debug!("Request to {} completed in {:?}", addr, latency);
970 }
971 Ok(response)
972 }
973 Err(err) => {
974 // Record error
975 if let Some(ref addr) = target_addr {
976 self.latency_tracker.record_error(addr);
977
978 // Record in connection pool metrics (if enabled)
979 if let Some(ref pool) = self.connection_pool {
980 pool.record_error(addr);
981 }
982
983 warn!("Request to {} failed after {:?}: {:?}", addr, latency, err);
984 }
985 Err(err)
986 }
987 }
988 }
989 Err(join_err) => {
990 // Task panic or cancellation
991 error!("Send task failed: {:?}", join_err);
992
993 // Record error
994 if let Some(ref addr) = target_addr {
995 self.latency_tracker.record_error(addr);
996 }
997
998 Err(rocketmq_error::RocketMQError::Internal(format!(
999 "Send task error: {}",
1000 join_err
1001 )))
1002 }
1003 }
1004 }
1005
1006 async fn invoke_request_oneway(
1007 &self,
1008 addr: &CheetahString,
1009 request: RemotingCommand,
1010 timeout_millis: u64,
1011 ) {
1012 let client = self.get_and_create_client(Some(addr)).await;
1013 match client {
1014 None => {
1015 error!("get client failed");
1016 }
1017 Some(mut client) => {
1018 self.client_runtime
1019 .as_ref()
1020 .unwrap()
1021 .get_handle()
1022 .spawn(async move {
1023 match time::timeout(Duration::from_millis(timeout_millis), async move {
1024 let mut request = request;
1025 request.mark_oneway_rpc_ref();
1026 client.send(request).await
1027 })
1028 .await
1029 {
1030 Ok(_) => Ok::<(), rocketmq_error::RocketMQError>(()),
1031 Err(_) => Err(rocketmq_error::RocketMQError::Timeout {
1032 operation: "send_oneway",
1033 timeout_ms: timeout_millis,
1034 }),
1035 }
1036 });
1037 }
1038 }
1039 }
1040
1041 fn is_address_reachable(&mut self, addr: &CheetahString) {
1042 todo!()
1043 }
1044
1045 fn close_clients(&mut self, addrs: Vec<String>) {
1046 todo!()
1047 }
1048
1049 fn register_processor(&mut self, processor: impl RequestProcessor + Sync) {
1050 todo!()
1051 }
1052}
1053
1054fn init_value_index() -> i32 {
1055 let mut rng = rand::rng();
1056 rng.random_range(0..999)
1057}