rocketmq_remoting/clients/
rocketmq_tokio_client.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::sync::atomic::AtomicI32;
20use std::sync::Arc;
21use std::time::Duration;
22
23use cheetah_string::CheetahString;
24use dashmap::DashMap;
25use rand::Rng;
26use rocketmq_runtime::RocketMQRuntime;
27use rocketmq_rust::ArcMut;
28use rocketmq_rust::WeakArcMut;
29use tokio::time;
30use tracing::debug;
31use tracing::error;
32use tracing::info;
33use tracing::warn;
34
35use crate::base::connection_net_event::ConnectionNetEvent;
36use crate::clients::connection_pool::ConnectionPool;
37use crate::clients::nameserver_selector::LatencyTracker;
38use crate::clients::reconnect::CircuitBreaker;
39use crate::clients::Client;
40use crate::clients::RemotingClient;
41use crate::protocol::remoting_command::RemotingCommand;
42use crate::remoting::inner::RemotingGeneralHandler;
43use crate::remoting::RemotingService;
44use crate::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
45use crate::runtime::config::client_config::TokioClientConfig;
46use crate::runtime::processor::RequestProcessor;
47use crate::runtime::RPCHook;
48
49const LOCK_TIMEOUT_MILLIS: u64 = 3000;
50
51/// High-performance async RocketMQ client with connection pooling and auto-reconnection.
52///
53/// # Architecture
54///
55/// ```text
56/// ┌─────────────────────────────────────────────────────────┐
57/// │            RocketmqDefaultClient<PR>                    │
58/// ├─────────────────────────────────────────────────────────┤
59/// │                                                         │
60/// │  ┌────────────────┐      ┌──────────────────┐         │
61/// │  │ Connection Pool│ ───► │NameServer Router │         │
62/// │  │  (DashMap)     │      │  (Health-based)  │         │
63/// │  └────────────────┘      └──────────────────┘         │
64/// │         │                         │                    │
65/// │         ↓                         ↓                    │
66/// │  ┌────────────────┐      ┌──────────────────┐         │
67/// │  │ Request Handler│ ───► │  Response Table  │         │
68/// │  │  (async tasks) │      │   (oneshot rx)   │         │
69/// │  └────────────────┘      └──────────────────┘         │
70/// │                                                         │
71/// └─────────────────────────────────────────────────────────┘
72/// ```
73///
74/// # Key Features
75///
76/// - **Connection Pooling**: Reuses TCP connections to brokers/nameservers
77/// - **Auto-Reconnection**: Exponential backoff retry on connection failures
78/// - **Smart Routing**: Selects healthiest nameserver based on latency/errors
79/// - **Request Multiplexing**: Multiple concurrent requests per connection
80/// - **Graceful Shutdown**: Drains in-flight requests before closing
81///
82/// # Performance Characteristics
83///
84/// - **Lock Contention**: Uses `Arc<Mutex<HashMap>>` for connection pool
85///   - ⚠️ TODO: Migrate to `DashMap` for lock-free reads
86/// - **Memory**: O(N) where N = number of unique broker addresses
87/// - **Latency**: Single async hop for cached connections, 2-3 hops for new
88///
89/// # Type Parameters
90///
91/// * `PR` - Request processor type (default: `DefaultRemotingRequestProcessor`)
92///
93/// # Example
94///
95/// ```rust,ignore
96/// use std::sync::Arc;
97///
98/// use rocketmq_remoting::clients::RocketmqDefaultClient;
99/// use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
100///
101/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
102/// let config = Arc::new(TokioClientConfig::default());
103/// let processor = Default::default();
104/// let client = RocketmqDefaultClient::new(config, processor);
105///
106/// // Update nameserver list
107/// client
108///     .update_name_server_address_list(vec!["127.0.0.1:9876".into()])
109///     .await;
110///
111/// // Send request
112/// let response = client
113///     .invoke_request(
114///         None, // use default nameserver
115///         request, 3000, // 3s timeout
116///     )
117///     .await?;
118/// # Ok(())
119/// # }
120/// ```
121pub struct RocketmqDefaultClient<PR = DefaultRemotingRequestProcessor> {
122    /// Client configuration (timeouts, buffer sizes, etc.)
123    ///
124    /// Shared across all connections to avoid duplication
125    tokio_client_config: Arc<TokioClientConfig>,
126
127    /// Connection pool: `addr -> Client` mapping
128    ///
129    /// **Lock-Free Design**: Uses `DashMap` for concurrent access without Mutex
130    /// - Read operations (get): Zero-lock overhead
131    /// - Write operations (insert/remove): Fine-grained per-shard locking
132    /// - Concurrency: Scales linearly with CPU cores (typically 16-64 shards)
133    ///
134    /// Invariant: Only contains healthy connections (unhealthy removed on error)
135    connection_tables: Arc<DashMap<CheetahString /* ip:port */, Client<PR>>>,
136
137    /// List of all nameserver addresses (in priority order)
138    ///
139    /// Updated via `update_name_server_address_list()`
140    namesrv_addr_list: ArcMut<Vec<CheetahString>>,
141
142    /// Currently selected nameserver (cached for fast path)
143    ///
144    /// May be `None` if no nameserver available or all unhealthy
145    namesrv_addr_choosed: ArcMut<Option<CheetahString>>,
146
147    /// Set of healthy/reachable nameservers
148    ///
149    /// Updated asynchronously by health check task (`scan_available_name_srv`)
150    available_namesrv_addr_set: ArcMut<HashSet<CheetahString>>,
151
152    /// Round-robin index for nameserver selection
153    ///
154    /// ⚠️ Deprecated: Use `latency_tracker` for smart selection
155    namesrv_index: Arc<AtomicI32>,
156
157    /// Latency tracker for smart nameserver selection
158    ///
159    /// Tracks P99 latency and error rates to select the best nameserver
160    latency_tracker: LatencyTracker,
161
162    /// Circuit breakers per address to prevent cascading failures
163    ///
164    /// Maps address to circuit breaker state for auto-reconnection
165    circuit_breakers: Arc<DashMap<CheetahString, CircuitBreaker>>,
166
167    /// Advanced connection pool with metrics and lifecycle management
168    ///
169    /// **Optional Feature**: Provides enhanced connection tracking:
170    /// - Idle timeout and automatic cleanup
171    /// - Per-connection metrics (latency, error rate)
172    /// - Pool-level statistics (utilization, health)
173    ///
174    /// **Usage**: Call `enable_connection_pool()` to activate
175    connection_pool: Option<ConnectionPool<PR>>,
176
177    /// Background task runtime for health checks and cleanup
178    ///
179    /// `None` after `shutdown()` is called
180    client_runtime: Option<RocketMQRuntime>,
181
182    /// Shared command handler (processor + response table)
183    ///
184    /// Arc-wrapped to share across all `Client` instances
185    cmd_handler: ArcMut<RemotingGeneralHandler<PR>>,
186
187    /// Optional connection event broadcaster
188    ///
189    /// Used for monitoring and metrics collection
190    tx: Option<tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
191}
192impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR> {
193    pub fn new(tokio_client_config: Arc<TokioClientConfig>, processor: PR) -> Self {
194        Self::new_with_cl(tokio_client_config, processor, None)
195    }
196
197    pub fn new_with_cl(
198        tokio_client_config: Arc<TokioClientConfig>,
199        processor: PR,
200        tx: Option<tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
201    ) -> Self {
202        let handler = RemotingGeneralHandler {
203            request_processor: processor,
204            //shutdown: (),
205            rpc_hooks: vec![],
206            response_table: ArcMut::new(HashMap::with_capacity(512)),
207        };
208        Self {
209            tokio_client_config,
210            connection_tables: Arc::new(DashMap::with_capacity(64)),
211            namesrv_addr_list: ArcMut::new(Default::default()),
212            namesrv_addr_choosed: ArcMut::new(Default::default()),
213            available_namesrv_addr_set: ArcMut::new(Default::default()),
214            namesrv_index: Arc::new(AtomicI32::new(init_value_index())),
215            latency_tracker: LatencyTracker::new(),
216            circuit_breakers: Arc::new(DashMap::with_capacity(64)),
217            connection_pool: None, // Disabled by default, enable via enable_connection_pool()
218            client_runtime: Some(RocketMQRuntime::new_multi(10, "client-thread")),
219            cmd_handler: ArcMut::new(handler),
220            tx,
221        }
222    }
223}
224
225impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR> {
226    /// Enable advanced connection pool with metrics and automatic cleanup.
227    ///
228    /// # Arguments
229    ///
230    /// * `max_connections` - Maximum number of connections (0 = unlimited)
231    /// * `max_idle_duration` - Idle timeout (e.g., 5 minutes)
232    /// * `cleanup_interval` - Cleanup task interval (e.g., 30 seconds)
233    ///
234    /// # Returns
235    ///
236    /// Task handle for the cleanup background task (can be aborted)
237    ///
238    /// # Example
239    ///
240    /// ```rust,ignore
241    /// # use rocketmq_remoting::clients::RocketmqDefaultClient;
242    /// # use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
243    /// # use std::sync::Arc;
244    /// # use std::time::Duration;
245    /// # async fn example() {
246    /// let client =
247    ///     RocketmqDefaultClient::new(Arc::new(TokioClientConfig::default()), Default::default());
248    ///
249    /// // Enable connection pool with:
250    /// // - Max 1000 connections
251    /// // - 5 minute idle timeout
252    /// // - 30 second cleanup interval
253    /// let cleanup_task =
254    ///     client.enable_connection_pool(1000, Duration::from_secs(300), Duration::from_secs(30));
255    ///
256    /// // ... use client ...
257    ///
258    /// // Stop cleanup when shutting down
259    /// cleanup_task.abort();
260    /// # }
261    /// ```
262    pub fn enable_connection_pool(
263        &mut self,
264        max_connections: usize,
265        max_idle_duration: Duration,
266        cleanup_interval: Duration,
267    ) -> tokio::task::JoinHandle<()> {
268        let pool = ConnectionPool::new(max_connections, max_idle_duration);
269        let cleanup_task = pool.start_cleanup_task(cleanup_interval);
270        self.connection_pool = Some(pool);
271        info!(
272            "Connection pool enabled: max={}, idle_timeout={:?}, cleanup_interval={:?}",
273            max_connections, max_idle_duration, cleanup_interval
274        );
275        cleanup_task
276    }
277
278    /// Get connection pool statistics (if enabled).
279    ///
280    /// # Returns
281    ///
282    /// * `Some(stats)` - Pool statistics
283    /// * `None` - Connection pool not enabled
284    ///
285    /// # Example
286    ///
287    /// ```rust,ignore
288    /// # use rocketmq_remoting::clients::RocketmqDefaultClient;
289    /// # fn example(client: &RocketmqDefaultClient) {
290    /// if let Some(stats) = client.get_pool_stats() {
291    ///     println!(
292    ///         "Pool: {}/{} connections ({:.1}% util)",
293    ///         stats.active(),
294    ///         stats.max_connections,
295    ///         stats.utilization() * 100.0
296    ///     );
297    ///     println!("Error rate: {:.2}%", stats.error_rate() * 100.0);
298    /// }
299    /// # }
300    /// ```
301    pub fn get_pool_stats(&self) -> Option<crate::clients::connection_pool::PoolStats> {
302        self.connection_pool.as_ref().map(|pool| pool.stats())
303    }
304
305    /// Get or create connection to a healthy nameserver using smart latency-based selection.
306    ///
307    /// # Selection Strategy
308    ///
309    /// **Latency-based** (NEW): Selects lowest P99 latency nameserver
310    /// ```text
311    /// namesrv_list = [ns1, ns2, ns3]
312    /// Metrics:
313    ///   ns1: P99=5ms,  errors=0
314    ///   ns2: P99=50ms, errors=0
315    ///   ns3: P99=10ms, errors=3 (unhealthy)
316    ///
317    /// Selection: ns1 (lowest latency + healthy)
318    /// ```
319    ///
320    /// **Scoring Formula**:
321    /// ```text
322    /// score = P99_latency_ms + (consecutive_errors × 100)
323    /// ```
324    ///
325    /// **Fallback**: If no metrics available, uses first nameserver
326    ///
327    /// # Performance Notes
328    ///
329    /// - **Lock Minimization**: Drops lock before expensive `create_client()`
330    /// - **Smart Selection**: O(N) where N = nameserver count (typically <10)
331    /// - **Caching**: Reuses `namesrv_addr_choosed` for fast path
332    ///
333    /// # Returns
334    ///
335    /// * `Some(client)` - Connected to healthy nameserver
336    /// * `None` - No nameservers available or all unhealthy
337    async fn get_and_create_nameserver_client(&self) -> Option<Client<PR>> {
338        // Try cached nameserver ===
339        let cached_addr = self.namesrv_addr_choosed.as_ref().clone();
340
341        if let Some(ref addr) = cached_addr {
342            // Quick lookup in connection pool (lock-free with DashMap)
343            if let Some(client) = self.connection_tables.get(addr) {
344                if client.connection().is_healthy() && self.latency_tracker.is_healthy(addr) {
345                    // Fast path: Cached nameserver is healthy
346                    return Some(client.value().clone());
347                }
348                debug!("Cached nameserver {} is unhealthy, selecting new one", addr);
349            }
350        }
351
352        // Smart nameserver selection ===
353        let addr_list = self.namesrv_addr_list.as_ref();
354
355        if addr_list.is_empty() {
356            warn!("No nameservers configured in namesrv_addr_list");
357            return None;
358        }
359
360        // Use latency tracker to select best nameserver
361        let selected_addr = self.latency_tracker.select_best(addr_list)?;
362
363        info!(
364            "Selected nameserver: {} (P99: {:?}, errors: {})",
365            selected_addr,
366            self.latency_tracker
367                .get_p99(selected_addr)
368                .unwrap_or(Duration::from_secs(0)),
369            self.latency_tracker.get_error_count(selected_addr)
370        );
371
372        // Update cached selection
373        self.namesrv_addr_choosed
374            .mut_from_ref()
375            .replace(selected_addr.clone());
376
377        //Create connection to selected nameserver ===
378        self.create_client(
379            selected_addr,
380            Duration::from_millis(self.tokio_client_config.connect_timeout_millis as u64),
381        )
382        .await
383    }
384
385    /// Get existing healthy client or create new connection.
386    ///
387    /// # Flow
388    /// 1. If `addr` is `None` or empty, route to nameserver
389    /// 2. Check connection pool for existing client
390    /// 3. Verify client health (connection.is_healthy() == true)
391    /// 4. If unhealthy or missing, create new connection
392    ///
393    /// # Performance
394    /// - **Fast path**: Single lock acquire + HashMap lookup + health check (< 100ns)
395    /// - **Slow path**: Lock + TCP handshake + TLS (if enabled) (10-50ms)
396    ///
397    /// # Lock Optimization
398    /// Current: Hold lock during `get()` and `clone()`
399    /// TODO: Use `DashMap` for lock-free read path:
400    /// ```rust,ignore
401    /// if let Some(client) = self.connection_tables.get(addr) {
402    ///     if client.connection().is_healthy() {
403    ///         return Some(client.clone());
404    ///     }
405    /// }
406    /// ```
407    async fn get_and_create_client(&self, addr: Option<&CheetahString>) -> Option<Client<PR>> {
408        // HOT PATH: Most requests hit this method
409
410        // Route empty addresses to nameserver
411        let target_addr = match addr {
412            None => return self.get_and_create_nameserver_client().await,
413            Some(addr) if addr.is_empty() => return self.get_and_create_nameserver_client().await,
414            Some(addr) => addr,
415        };
416
417        // Fast path: Check connection pool (lock-free with DashMap)
418        if let Some(client_ref) = self.connection_tables.get(target_addr) {
419            let client = client_ref.value().clone();
420            if client.connection().is_healthy() {
421                return Some(client); // Return healthy cached client
422            }
423            // Client unhealthy - will create new connection
424            debug!(
425                "Cached client for {} is unhealthy, reconnecting...",
426                target_addr
427            );
428        }
429
430        // Slow path: Create new connection
431        self.create_client(
432            target_addr,
433            Duration::from_millis(self.tokio_client_config.connect_timeout_millis as u64),
434        )
435        .await
436    }
437
438    /// Create new client connection with double-checked locking pattern.
439    ///
440    /// # Concurrency Strategy
441    ///
442    /// Uses **double-checked locking** to prevent thundering herd:
443    /// 1. **Check 1**: Quick lookup before TCP connect (avoids redundant connects)
444    /// 2. **Release lock**: Perform TCP connect WITHOUT holding lock
445    /// 3. **Check 2**: Re-acquire lock and verify no other task created connection
446    /// 4. **Insert**: Store new client in pool
447    ///
448    /// # Performance
449    ///
450    /// **Before (holding lock during connect)**:
451    /// ```text
452    /// Thread 1: [====== LOCK ======][==== CONNECT (50ms) ====][==== INSERT ====]
453    /// Thread 2:                      [waiting.....................][LOCK]
454    /// Thread 3:                      [waiting.....................][LOCK]
455    /// Total: ~50ms * 3 = 150ms wasted
456    /// ```
457    ///
458    /// **After (lock-free connect)**:
459    /// ```text
460    /// Thread 1: [== LOCK ==][RELEASE]→[CONNECT 50ms]→[LOCK][INSERT]
461    /// Thread 2: [== LOCK ==][RELEASE]→[CONNECT 50ms]→[LOCK][cached!]
462    /// Thread 3: [== LOCK ==][RELEASE]→[CONNECT 50ms]→[LOCK][cached!]
463    /// Total: ~50ms + small lock overhead
464    /// ```
465    ///
466    /// # Arguments
467    ///
468    /// * `addr` - Target address (e.g., "127.0.0.1:10911")
469    /// * `duration` - Connection timeout
470    ///
471    /// # Returns
472    ///
473    /// * `Some(client)` - Successfully connected (either new or cached)
474    /// * `None` - Connection failed or timed out (or circuit breaker OPEN)
475    async fn create_client(&self, addr: &CheetahString, duration: Duration) -> Option<Client<PR>> {
476        // Try connection pool first (if enabled) ===
477        if let Some(ref pool) = self.connection_pool {
478            if let Some(pooled_conn) = pool.get(addr) {
479                if pooled_conn.is_healthy() {
480                    debug!("Reusing pooled connection to {}", addr);
481                    return Some(pooled_conn.client().clone());
482                }
483                // Unhealthy connection - remove from pool
484                pool.remove(addr);
485            }
486        }
487
488        // Double-check before expensive connect (lock-free with DashMap) ===
489
490        // Check if healthy client already exists (fallback to DashMap)
491        if let Some(client_ref) = self.connection_tables.get(addr) {
492            let client = client_ref.value().clone();
493            if client.connection().is_healthy() {
494                return Some(client);
495            }
496            // Client unhealthy - remove it immediately (DashMap allows concurrent removal)
497            drop(client_ref); // Release read guard before removal
498            self.connection_tables.remove(addr);
499        }
500
501        // check circuit breaker ===
502        // Get or create circuit breaker for this address
503        let mut breaker = self
504            .circuit_breakers
505            .entry(addr.clone())
506            .or_insert_with(CircuitBreaker::default_breaker)
507            .clone();
508
509        // Check if request allowed (CLOSED or HALF_OPEN)
510        if !breaker.allow_request() {
511            warn!(
512                "Circuit breaker OPEN for {}, rejecting connection attempt",
513                addr
514            );
515            return None;
516        }
517
518        // Perform TCP connect WITHOUT holding lock ===
519        let addr_inner = addr.to_string();
520
521        let connect_result = time::timeout(duration, async {
522            Client::connect(addr_inner, self.cmd_handler.clone(), self.tx.as_ref()).await
523        })
524        .await;
525
526        // Handle connection result and update circuit breaker ===
527        match connect_result {
528            Ok(Ok(new_client)) => {
529                // Connection successful - record success in circuit breaker
530                breaker.record_success();
531                self.circuit_breakers.insert(addr.clone(), breaker);
532
533                // Insert into connection pool (if enabled) ===
534                if let Some(ref pool) = self.connection_pool {
535                    if pool.insert(addr.clone(), new_client.clone()) {
536                        info!(
537                            "Added connection to pool: {} (pool size: {})",
538                            addr,
539                            pool.stats().total
540                        );
541                    } else {
542                        warn!("Connection pool at capacity, falling back to DashMap");
543                    }
544                }
545
546                // Insert into DashMap (fallback or dual-store) ===
547                match self.connection_tables.entry(addr.clone()) {
548                    dashmap::mapref::entry::Entry::Occupied(mut entry) => {
549                        // Check if existing is still healthy
550                        if entry.get().connection().is_healthy() {
551                            info!("Race condition: {} already connected by another task", addr);
552                            return Some(entry.get().clone());
553                        }
554                        // Replace unhealthy with new client
555                        entry.insert(new_client.clone());
556                    }
557                    dashmap::mapref::entry::Entry::Vacant(entry) => {
558                        entry.insert(new_client.clone());
559                    }
560                }
561
562                info!("Successfully created client for {}", addr);
563                Some(new_client)
564            }
565            Ok(Err(e)) => {
566                // Connection failed - record failure in circuit breaker
567                error!("Failed to connect to {}: {:?}", addr, e);
568                breaker.record_failure();
569                self.circuit_breakers.insert(addr.clone(), breaker);
570                None
571            }
572            Err(_) => {
573                // Timeout - record failure in circuit breaker
574                error!("Connection to {} timed out after {:?}", addr, duration);
575                breaker.record_failure();
576                self.circuit_breakers.insert(addr.clone(), breaker);
577                None
578            }
579        }
580    }
581
582    /// Creates a client with automatic retry using exponential backoff.
583    ///
584    /// # Arguments
585    ///
586    /// * `addr` - Target address
587    /// * `duration` - Connection timeout per attempt
588    /// * `max_attempts` - Maximum retry attempts (0 = use circuit breaker only)
589    ///
590    /// # Returns
591    ///
592    /// * `Some(client)` - Successfully connected (possibly after retries)
593    /// * `None` - All attempts failed or circuit breaker blocked
594    ///
595    /// # Example
596    ///
597    /// ```ignore
598    /// // Retry up to 3 times with exponential backoff (1s, 2s, 4s)
599    /// let client = self.create_client_with_retry(addr, Duration::from_secs(5), 3).await;
600    /// ```
601    async fn create_client_with_retry(
602        &self,
603        addr: &CheetahString,
604        duration: Duration,
605        max_attempts: u32,
606    ) -> Option<Client<PR>> {
607        use crate::clients::reconnect::ExponentialBackoff;
608
609        let mut backoff = ExponentialBackoff::new(
610            Duration::from_secs(1),  // Initial delay
611            Duration::from_secs(10), // Max delay
612            max_attempts,
613        );
614
615        loop {
616            // Try to create client
617            if let Some(client) = self.create_client(addr, duration).await {
618                return Some(client);
619            }
620
621            // Check if should retry
622            if let Some(delay) = backoff.next_delay() {
623                debug!(
624                    "Connection to {} failed, retrying in {:?} (attempt {}/{})",
625                    addr,
626                    delay,
627                    backoff.current_attempt(),
628                    max_attempts
629                );
630                time::sleep(delay).await;
631            } else {
632                warn!(
633                    "Connection to {} failed after {} attempts",
634                    addr,
635                    backoff.current_attempt()
636                );
637                return None;
638            }
639        }
640    }
641
642    /// Background task: Continuously scan nameservers to update availability set.
643    ///
644    /// # Purpose
645    ///
646    /// Maintains `available_namesrv_addr_set` by probing all configured nameservers
647    /// and marking them as available/unavailable based on connection health.
648    ///
649    /// # Algorithm
650    ///
651    /// ```text
652    /// 1. Cleanup phase: Remove stale entries not in namesrv_addr_list
653    /// 2. Probe phase: Test connection to each nameserver
654    /// 3. Update phase: Add/remove from available_namesrv_addr_set
655    /// ```
656    ///
657    /// # Performance
658    ///
659    /// - **Frequency**: Called every `connect_timeout_millis` (typically 3s)
660    /// - **Concurrency**: Sequential probes (TODO: parallelize with `join_all`)
661    /// - **Overhead**: O(N) where N = number of nameservers (typically < 10)
662    ///
663    /// # Optimization Opportunities
664    ///
665    /// 1. **Parallel probes**: Use `futures::future::join_all` to probe all nameservers
666    ///    concurrently
667    /// 2. **Smart backoff**: Skip probing recently-successful servers
668    /// 3. **Metrics**: Track probe latency for latency-based selection
669    ///
670    /// # Example Timeline
671    ///
672    /// ```text
673    /// T+0s:  Start scan
674    /// T+0s:  Cleanup: Remove ["old-ns:9876"]
675    /// T+0s:  Probe ns1 → Success (mark available)
676    /// T+50ms: Probe ns2 → Timeout (mark unavailable)
677    /// T+100ms: Probe ns3 → Success (mark available)
678    /// T+100ms: Scan complete
679    /// T+3s:  Next scan begins...
680    /// ```
681    async fn scan_available_name_srv(&self) {
682        let addr_list = self.namesrv_addr_list.as_ref();
683
684        if addr_list.is_empty() {
685            debug!("No nameservers configured, skipping availability scan");
686            return;
687        }
688
689        // Cleanup - Remove stale entries ===
690        // Collect addresses to remove (avoid holding borrow during mutation)
691        let stale_addrs: Vec<CheetahString> = self
692            .available_namesrv_addr_set
693            .as_ref()
694            .iter()
695            .filter(|addr| !addr_list.contains(addr))
696            .cloned()
697            .collect();
698
699        for stale_addr in stale_addrs {
700            warn!(
701                "Removing stale nameserver from available set: {}",
702                stale_addr
703            );
704            self.available_namesrv_addr_set
705                .mut_from_ref()
706                .remove(&stale_addr);
707        }
708
709        // Parallel probe all configured nameservers ===
710        // Parallel probing reduces scan time from O(N * 50ms) to O(max(50ms))
711        use futures::future::join_all;
712
713        let probe_futures: Vec<_> = addr_list
714            .iter()
715            .map(|addr| {
716                let addr_clone = addr.clone();
717                async move {
718                    let result = self.get_and_create_client(Some(&addr_clone)).await;
719                    (addr_clone, result.is_some())
720                }
721            })
722            .collect();
723
724        // Execute all probes concurrently
725        let results = join_all(probe_futures).await;
726
727        // Update availability set based on probe results
728        for (namesrv_addr, is_available) in results {
729            if is_available {
730                // Connection successful - mark as available
731                if self
732                    .available_namesrv_addr_set
733                    .mut_from_ref()
734                    .insert(namesrv_addr.clone())
735                {
736                    info!("Nameserver {} is now available", namesrv_addr);
737                }
738            } else {
739                // Connection failed - mark as unavailable
740                if self
741                    .available_namesrv_addr_set
742                    .mut_from_ref()
743                    .remove(&namesrv_addr)
744                {
745                    warn!("Nameserver {} is now unavailable", namesrv_addr);
746                }
747            }
748        }
749
750        debug!(
751            "Availability scan complete: {}/{} nameservers available",
752            self.available_namesrv_addr_set.as_ref().len(),
753            addr_list.len()
754        );
755    }
756}
757
758#[allow(unused_variables)]
759impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingService for RocketmqDefaultClient<PR> {
760    async fn start(&self, this: WeakArcMut<Self>) {
761        if let Some(client) = this.upgrade() {
762            let connect_timeout_millis = self.tokio_client_config.connect_timeout_millis as u64;
763            self.client_runtime
764                .as_ref()
765                .unwrap()
766                .get_handle()
767                .spawn(async move {
768                    loop {
769                        client.scan_available_name_srv().await;
770                        time::sleep(Duration::from_millis(connect_timeout_millis)).await;
771                    }
772                });
773        }
774    }
775
776    fn shutdown(&mut self) {
777        if let Some(rt) = self.client_runtime.take() {
778            rt.shutdown();
779        }
780        // DashMap::clear() is already thread-safe, no need for async lock
781        self.connection_tables.clear();
782        self.namesrv_addr_list.clear();
783        self.available_namesrv_addr_set.clear();
784
785        info!(">>>>>>>>>>>>>>>RemotingClient shutdown success<<<<<<<<<<<<<<<<<");
786    }
787
788    fn register_rpc_hook(&mut self, hook: Arc<dyn RPCHook>) {
789        self.cmd_handler.register_rpc_hook(hook);
790    }
791
792    fn clear_rpc_hook(&mut self) {
793        todo!()
794    }
795}
796
797#[allow(unused_variables)]
798impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqDefaultClient<PR> {
799    async fn update_name_server_address_list(&self, addrs: Vec<CheetahString>) {
800        let old = self.namesrv_addr_list.mut_from_ref();
801        let mut update = false;
802
803        if !addrs.is_empty() {
804            if old.is_empty() || addrs.len() != old.len() {
805                update = true;
806            } else {
807                for addr in &addrs {
808                    if !old.contains(addr) {
809                        update = true;
810                        break;
811                    }
812                }
813            }
814
815            if update {
816                // Shuffle the addresses
817                // Shuffle logic is not implemented here as it is not available in standard library
818                // You can implement it using various algorithms like Fisher-Yates shuffle
819
820                info!(
821                    "name remoting_server address updated. NEW : {:?} , OLD: {:?}",
822                    addrs, old
823                );
824                /* let mut rng = thread_rng();
825                addrs.shuffle(&mut rng);*/
826                self.namesrv_addr_list.mut_from_ref().extend(addrs.clone());
827
828                // should close the channel if choosed addr is not exist.
829                if let Some(namesrv_addr) = self.namesrv_addr_choosed.as_ref() {
830                    if !addrs.contains(namesrv_addr) {
831                        // DashMap allows direct removal without collecting
832                        self.connection_tables.remove(namesrv_addr);
833                    }
834                }
835            }
836        }
837    }
838
839    fn get_name_server_address_list(&self) -> &[CheetahString] {
840        self.namesrv_addr_list.as_ref()
841    }
842
843    fn get_available_name_srv_list(&self) -> Vec<CheetahString> {
844        self.available_namesrv_addr_set
845            .as_ref()
846            .clone()
847            .into_iter()
848            .collect()
849    }
850
851    /// Send request and wait for response with timeout.
852    ///
853    /// # HOT PATH
854    /// This is the primary client API - optimize for low latency and high throughput.
855    ///
856    /// # Flow
857    /// ```text
858    /// 1. Get/create client connection         (~100ns fast path, ~50ms slow)
859    /// 2. Spawn send task on runtime           (~10μs)
860    /// 3. Apply timeout wrapper                (~100ns)
861    /// 4. Wait for response via oneshot        (network RTT + processing)
862    /// 5. Unwrap nested Result layers          (~10ns)
863    /// ```
864    ///
865    /// # Performance Optimizations
866    ///
867    /// - **Early validation**: Check client availability before expensive spawn
868    /// - **Flat error handling**: Reduce nested `match` overhead
869    /// - **Direct await**: Spawn only for timeout enforcement (could be optimized further)
870    ///
871    /// # Error Handling
872    ///
873    /// Returns `RocketmqError::RemoteError` for all failures:
874    /// - Client unavailable (no connection)
875    /// - Network I/O error (send/recv failure)
876    /// - Timeout (no response within deadline)
877    /// - Task spawn error (runtime shutdown)
878    ///
879    /// # Arguments
880    ///
881    /// * `addr` - Target address (None = use nameserver)
882    /// * `request` - Command to send
883    /// * `timeout_millis` - Max wait time for response
884    ///
885    /// # Examples
886    ///
887    /// ```rust,ignore
888    /// # use rocketmq_remoting::clients::RocketmqDefaultClient;
889    /// # use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
890    /// # async fn example(client: &RocketmqDefaultClient) -> Result<(), Box<dyn std::error::Error>> {
891    /// let request = RemotingCommand::create_request_command(/* ... */);
892    /// let response = client.invoke_request(
893    ///     Some(&"127.0.0.1:10911".into()),
894    ///     request,
895    ///     3000 // 3 second timeout
896    /// ).await?;
897    /// # Ok(())
898    /// # }
899    /// ```
900    async fn invoke_request(
901        &self,
902        addr: Option<&CheetahString>,
903        request: RemotingCommand,
904        timeout_millis: u64,
905    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
906        // Record start time for latency tracking
907        let start = time::Instant::now();
908
909        // Determine target address (for metrics recording)
910        let target_addr = addr
911            .cloned()
912            .or_else(|| self.namesrv_addr_choosed.as_ref().clone());
913
914        // === Get client connection ===
915        let mut client = self.get_and_create_client(addr).await.ok_or_else(|| {
916            let target = addr.map(|a| a.as_str()).unwrap_or("<nameserver>");
917            error!("Failed to get client for {}", target);
918
919            // Record connection error
920            if let Some(ref addr) = target_addr {
921                self.latency_tracker.record_error(addr);
922            }
923
924            rocketmq_error::RocketMQError::network_connection_failed(
925                target.to_string(),
926                "Failed to connect",
927            )
928        })?;
929
930        // === Send request with timeout ===
931        let runtime = self.client_runtime.as_ref().ok_or_else(|| {
932            error!("Client runtime has been shut down");
933            rocketmq_error::RocketMQError::ClientNotStarted
934        })?;
935
936        let send_task = runtime.get_handle().spawn(async move {
937            // Apply timeout at the send_read level
938            match time::timeout(
939                Duration::from_millis(timeout_millis),
940                client.send_read(request, timeout_millis),
941            )
942            .await
943            {
944                Ok(result) => result,
945                Err(_) => Err(rocketmq_error::RocketMQError::Timeout {
946                    operation: "send_request",
947                    timeout_ms: timeout_millis,
948                }),
949            }
950        });
951
952        // === Await result and record metrics ===
953        match send_task.await {
954            Ok(send_result) => {
955                let latency = start.elapsed();
956
957                match send_result {
958                    Ok(response) => {
959                        // Record successful request latency
960                        if let Some(ref addr) = target_addr {
961                            let latency_ms = latency.as_millis() as u64;
962                            self.latency_tracker.record_success(addr, latency);
963
964                            // Record in connection pool metrics (if enabled)
965                            if let Some(ref pool) = self.connection_pool {
966                                pool.record_success(addr, latency_ms);
967                            }
968
969                            debug!("Request to {} completed in {:?}", addr, latency);
970                        }
971                        Ok(response)
972                    }
973                    Err(err) => {
974                        // Record error
975                        if let Some(ref addr) = target_addr {
976                            self.latency_tracker.record_error(addr);
977
978                            // Record in connection pool metrics (if enabled)
979                            if let Some(ref pool) = self.connection_pool {
980                                pool.record_error(addr);
981                            }
982
983                            warn!("Request to {} failed after {:?}: {:?}", addr, latency, err);
984                        }
985                        Err(err)
986                    }
987                }
988            }
989            Err(join_err) => {
990                // Task panic or cancellation
991                error!("Send task failed: {:?}", join_err);
992
993                // Record error
994                if let Some(ref addr) = target_addr {
995                    self.latency_tracker.record_error(addr);
996                }
997
998                Err(rocketmq_error::RocketMQError::Internal(format!(
999                    "Send task error: {}",
1000                    join_err
1001                )))
1002            }
1003        }
1004    }
1005
1006    async fn invoke_request_oneway(
1007        &self,
1008        addr: &CheetahString,
1009        request: RemotingCommand,
1010        timeout_millis: u64,
1011    ) {
1012        let client = self.get_and_create_client(Some(addr)).await;
1013        match client {
1014            None => {
1015                error!("get client failed");
1016            }
1017            Some(mut client) => {
1018                self.client_runtime
1019                    .as_ref()
1020                    .unwrap()
1021                    .get_handle()
1022                    .spawn(async move {
1023                        match time::timeout(Duration::from_millis(timeout_millis), async move {
1024                            let mut request = request;
1025                            request.mark_oneway_rpc_ref();
1026                            client.send(request).await
1027                        })
1028                        .await
1029                        {
1030                            Ok(_) => Ok::<(), rocketmq_error::RocketMQError>(()),
1031                            Err(_) => Err(rocketmq_error::RocketMQError::Timeout {
1032                                operation: "send_oneway",
1033                                timeout_ms: timeout_millis,
1034                            }),
1035                        }
1036                    });
1037            }
1038        }
1039    }
1040
1041    fn is_address_reachable(&mut self, addr: &CheetahString) {
1042        todo!()
1043    }
1044
1045    fn close_clients(&mut self, addrs: Vec<String>) {
1046        todo!()
1047    }
1048
1049    fn register_processor(&mut self, processor: impl RequestProcessor + Sync) {
1050        todo!()
1051    }
1052}
1053
1054fn init_value_index() -> i32 {
1055    let mut rng = rand::rng();
1056    rng.random_range(0..999)
1057}