pub struct ConnectionManager { /* private fields */ }Expand description
Production-grade connection manager with advanced pooling, multiplexing, and resilience features.
The ConnectionManager provides a comprehensive solution for managing network connections with:
- Advanced connection pooling with lifecycle management
- HTTP/2-style multiplexing for efficient connection usage
- Retry logic with exponential backoff and jitter
- Circuit breaker pattern for fault tolerance
- Health checks and connection quality monitoring
- Connection load balancing and request distribution
- Performance monitoring and metrics collection
- Automatic resource cleanup and garbage collection
- Back pressure handling for overload protection
- Connection warming and preemptive scaling
- Request multiplexing and stream management
§Production Features
- Zero-downtime connection pool updates
- Graceful degradation under load
- Automatic connection warming
- Request routing and load balancing
- Connection affinity and session persistence
- Comprehensive observability and monitoring
- Memory-efficient connection reuse
- Adaptive connection limits based on system resources
§Multiplexing Support
- Stream-based connection multiplexing
- Request prioritization and queuing
- Concurrent request handling
- Flow control and backpressure
- Stream lifecycle management
§High-performance connection manager with pooling, metrics tracking and back pressure handling.
The ConnectionManager provides a comprehensive solution for managing network connections with:
- Connection pooling with configurable TTL
- Efficient concurrent connection tracking
- Detailed performance metrics collection
- Automatic resource cleanup
- Back pressure handling for overload protection
- Health monitoring and auto-recovery
- Circuit breaker pattern for failing connections
§Performance Features
- Lock-free concurrent data structures
- Connection pooling reduces setup overhead
- Batched status updates
- Efficient metrics collection
- Adaptive connection limits based on system resources
§Connection Pool Management
- Automatic connection reuse
- TTL-based expiration
- Configurable pool size
- Proactive cleanup of expired connections
- Health-based connection scoring
§Health Monitoring
- Periodic health checks
- Connection quality scoring
- Automatic failover
- Circuit breaker for unreliable peers
- Performance-based connection prioritization
§Metrics Tracking
- Queue metrics (size, utilization)
- Latency metrics (average, peak)
- Throughput metrics (messages/second)
- Connection pool statistics
- Health and reliability metrics
§Example
let manager = ConnectionManager::new(100); // 100 max connections
manager.connect(peer_id).await?;
let status = manager.get_status(&peer_id).await;
let metrics = manager.get_metrics().await;Implementations§
Source§impl ConnectionManager
impl ConnectionManager
Sourcepub async fn recover_connection(
&self,
peer_id: &PeerId,
) -> Result<(), NetworkError>
pub async fn recover_connection( &self, peer_id: &PeerId, ) -> Result<(), NetworkError>
Recovers from connection failures by attempting reconnection
Sourcepub async fn health_check(&self) -> Result<Vec<PeerId>, NetworkError>
pub async fn health_check(&self) -> Result<Vec<PeerId>, NetworkError>
Performs health check on all active connections
Sourcepub async fn auto_recover(&self) -> Result<usize, NetworkError>
pub async fn auto_recover(&self) -> Result<usize, NetworkError>
Automatically recovers unhealthy connections
Sourcepub fn new(max_connections: usize) -> Self
pub fn new(max_connections: usize) -> Self
Creates a new connection manager with default pool timeout (5 minutes).
The manager initializes with optimized default settings:
- 5 minute connection pool TTL
- Lock-free concurrent connection tracking
- Comprehensive metrics collection
- Health monitoring and circuit breakers
§Arguments
max_connections- Maximum number of concurrent connections to maintain
§Performance Considerations
- Choose max_connections based on system resources
- Connection pooling reduces setup overhead
- Metrics collection has minimal overhead
- Health monitoring provides proactive issue detection
Sourcepub fn with_pool_timeout(max_connections: usize, pool_timeout: Duration) -> Self
pub fn with_pool_timeout(max_connections: usize, pool_timeout: Duration) -> Self
Creates a new connection manager with enhanced features and custom pool timeout.
Allows fine-tuning of connection pooling behavior:
- Custom TTL for pooled connections
- Connection reuse optimization
- Resource usage control
- Enhanced health monitoring
§Arguments
max_connections- Maximum number of concurrent connectionspool_timeout- Time-to-live for pooled connections
§Connection Pool Behavior
- Connections are cached until timeout
- Expired connections automatically cleaned up
- Pool size limited by max_connections
- Health-based connection scoring and prioritization
Sourcepub async fn connect(&self, peer_id: PeerId) -> Result<(), NetworkError>
pub async fn connect(&self, peer_id: PeerId) -> Result<(), NetworkError>
Initiates a connection to a peer with automatic pooling and reuse.
Enhanced connection establishment process:
- Check circuit breaker status
- Check pool for existing healthy connection
- Reuse if valid connection exists
- Create new connection if needed
- Apply connection limits and health checks
- Initialize health monitoring
§Arguments
peer_id- ID of the peer to connect to
§Connection Pooling
- Reuses healthy connections when possible
- Validates connection freshness and quality
- Removes expired or unhealthy connections
- Updates usage metrics and health scores
§Circuit Breaker Protection
- Prevents connections to repeatedly failing peers
- Implements exponential backoff
- Automatic recovery testing
§Returns
Ok(())- Connection established or reusedErr(_)- Connection failed or circuit breaker open
Sourcepub fn update_status(&self, peer_id: PeerId, status: ConnectionStatus)
pub fn update_status(&self, peer_id: PeerId, status: ConnectionStatus)
Updates connection status for a peer with lock-free atomic guarantees.
Enhanced status update process:
- Update connection info with new status
- Update health and quality metrics
- Atomic metrics update
- Circuit breaker state management
- Event logging and monitoring
§Arguments
peer_id- ID of the peer to updatestatus- New connection statusresponse_time- Optional response time for quality calculationbytes_transferred- Optional bytes transferred for bandwidth tracking
§Thread Safety
- Status updates are lock-free and atomic
- Metrics updates use parking_lot for better performance
- Safe for concurrent access with minimal contention
§Health Tracking
Updates connection health scores, quality metrics, and circuit breaker states to ensure optimal connection management.
Sourcepub fn update_status_with_metrics(
&self,
peer_id: PeerId,
status: ConnectionStatus,
response_time: Option<Duration>,
bytes_transferred: u64,
)
pub fn update_status_with_metrics( &self, peer_id: PeerId, status: ConnectionStatus, response_time: Option<Duration>, bytes_transferred: u64, )
Updates connection status with detailed performance metrics
Sourcepub fn disconnect(&self, peer_id: &PeerId)
pub fn disconnect(&self, peer_id: &PeerId)
Disconnects from a peer with enhanced cleanup and health tracking
Sourcepub fn connection_count(&self) -> usize
pub fn connection_count(&self) -> usize
Returns connection count (lock-free)
Sourcepub fn get_status(&self, peer_id: &PeerId) -> Option<ConnectionStatus>
pub fn get_status(&self, peer_id: &PeerId) -> Option<ConnectionStatus>
Returns connection status for a peer (lock-free)
Sourcepub fn get_connection_info(&self, peer_id: &PeerId) -> Option<ConnectionInfo>
pub fn get_connection_info(&self, peer_id: &PeerId) -> Option<ConnectionInfo>
Returns detailed connection information for a peer (lock-free)
Sourcepub fn get_quality_score(&self, peer_id: &PeerId) -> Option<f64>
pub fn get_quality_score(&self, peer_id: &PeerId) -> Option<f64>
Get connection quality score for a peer
Sourcepub fn get_circuit_breaker_state(
&self,
peer_id: &PeerId,
) -> Option<CircuitBreakerState>
pub fn get_circuit_breaker_state( &self, peer_id: &PeerId, ) -> Option<CircuitBreakerState>
Get circuit breaker state for a peer
Sourcepub fn get_healthy_connections(&self) -> Vec<(PeerId, f64)>
pub fn get_healthy_connections(&self) -> Vec<(PeerId, f64)>
Get all healthy connections sorted by quality score
Sourcepub fn update_metrics(&self, messages_per_second: f64, avg_latency_ms: u64)
pub fn update_metrics(&self, messages_per_second: f64, avg_latency_ms: u64)
Updates network metrics with optimized locking
Sourcepub fn get_queue_metrics(&self) -> QueueMetrics
pub fn get_queue_metrics(&self) -> QueueMetrics
Get current queue metrics
Sourcepub fn get_latency_metrics(&self) -> LatencyMetrics
pub fn get_latency_metrics(&self) -> LatencyMetrics
Get current latency metrics
Sourcepub fn get_throughput_metrics(&self) -> ThroughputMetrics
pub fn get_throughput_metrics(&self) -> ThroughputMetrics
Get current throughput metrics
Sourcepub fn get_metrics(&self) -> NetworkMetrics
pub fn get_metrics(&self) -> NetworkMetrics
Returns current network metrics (optimized)
Sourcepub async fn open_stream(
&self,
peer_id: PeerId,
priority: Priority,
) -> Result<StreamId, NetworkError>
pub async fn open_stream( &self, peer_id: PeerId, priority: Priority, ) -> Result<StreamId, NetworkError>
Enhanced API methods for production features Open a multiplexed stream on a connection
Sourcepub async fn close_stream(
&self,
stream_id: StreamId,
) -> Result<(), NetworkError>
pub async fn close_stream( &self, stream_id: StreamId, ) -> Result<(), NetworkError>
Close a multiplexed stream
Sourcepub async fn send_stream_data(
&self,
stream_id: StreamId,
data: Bytes,
) -> Result<(), NetworkError>
pub async fn send_stream_data( &self, stream_id: StreamId, data: Bytes, ) -> Result<(), NetworkError>
Send data on a specific stream
Sourcepub async fn retry_connect(&self, peer_id: PeerId) -> Result<(), NetworkError>
pub async fn retry_connect(&self, peer_id: PeerId) -> Result<(), NetworkError>
Execute connection operation with retry logic
Sourcepub async fn select_best_connection(
&self,
available_peers: &[PeerId],
) -> Option<PeerId>
pub async fn select_best_connection( &self, available_peers: &[PeerId], ) -> Option<PeerId>
Select best connection using load balancer
Sourcepub async fn start_health_monitoring(&self, peer_id: PeerId)
pub async fn start_health_monitoring(&self, peer_id: PeerId)
Start health monitoring for a peer
Sourcepub async fn check_connection_health(
&self,
peer_id: PeerId,
) -> Option<HealthCheckResult>
pub async fn check_connection_health( &self, peer_id: PeerId, ) -> Option<HealthCheckResult>
Perform health check on a connection
Sourcepub async fn warm_connections(
&self,
peer_id: PeerId,
) -> Result<(), NetworkError>
pub async fn warm_connections( &self, peer_id: PeerId, ) -> Result<(), NetworkError>
Warm up connections for a peer
Sourcepub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState
pub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState
Get warming state for a peer
Sourcepub async fn get_connection_statistics(&self) -> ConnectionStatistics
pub async fn get_connection_statistics(&self) -> ConnectionStatistics
Get comprehensive connection statistics
Sourcepub fn set_connection_limits(&mut self, limits: ConnectionLimits)
pub fn set_connection_limits(&mut self, limits: ConnectionLimits)
Configure connection limits
Sourcepub fn get_connection_limits(&self) -> &ConnectionLimits
pub fn get_connection_limits(&self) -> &ConnectionLimits
Get current connection limits
Auto Trait Implementations§
impl Freeze for ConnectionManager
impl !RefUnwindSafe for ConnectionManager
impl Send for ConnectionManager
impl Sync for ConnectionManager
impl Unpin for ConnectionManager
impl !UnwindSafe for ConnectionManager
Blanket Implementations§
Source§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
Source§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
Source§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
Source§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more