Struct ConnectionManager

Source
pub struct ConnectionManager { /* private fields */ }
Expand description

Production-grade connection manager with advanced pooling, multiplexing, and resilience features.

The ConnectionManager provides a comprehensive solution for managing network connections with:

  • Advanced connection pooling with lifecycle management
  • HTTP/2-style multiplexing for efficient connection usage
  • Retry logic with exponential backoff and jitter
  • Circuit breaker pattern for fault tolerance
  • Health checks and connection quality monitoring
  • Connection load balancing and request distribution
  • Performance monitoring and metrics collection
  • Automatic resource cleanup and garbage collection
  • Back pressure handling for overload protection
  • Connection warming and preemptive scaling
  • Request multiplexing and stream management

§Production Features

  • Zero-downtime connection pool updates
  • Graceful degradation under load
  • Automatic connection warming
  • Request routing and load balancing
  • Connection affinity and session persistence
  • Comprehensive observability and monitoring
  • Memory-efficient connection reuse
  • Adaptive connection limits based on system resources

§Multiplexing Support

  • Stream-based connection multiplexing
  • Request prioritization and queuing
  • Concurrent request handling
  • Flow control and backpressure
  • Stream lifecycle management

§High-performance connection manager with pooling, metrics tracking and back pressure handling.

The ConnectionManager provides a comprehensive solution for managing network connections with:

  • Connection pooling with configurable TTL
  • Efficient concurrent connection tracking
  • Detailed performance metrics collection
  • Automatic resource cleanup
  • Back pressure handling for overload protection
  • Health monitoring and auto-recovery
  • Circuit breaker pattern for failing connections

§Performance Features

  • Lock-free concurrent data structures
  • Connection pooling reduces setup overhead
  • Batched status updates
  • Efficient metrics collection
  • Adaptive connection limits based on system resources

§Connection Pool Management

  • Automatic connection reuse
  • TTL-based expiration
  • Configurable pool size
  • Proactive cleanup of expired connections
  • Health-based connection scoring

§Health Monitoring

  • Periodic health checks
  • Connection quality scoring
  • Automatic failover
  • Circuit breaker for unreliable peers
  • Performance-based connection prioritization

§Metrics Tracking

  • Queue metrics (size, utilization)
  • Latency metrics (average, peak)
  • Throughput metrics (messages/second)
  • Connection pool statistics
  • Health and reliability metrics

§Example

let manager = ConnectionManager::new(100); // 100 max connections
manager.connect(peer_id).await?;
let status = manager.get_status(&peer_id).await;
let metrics = manager.get_metrics().await;

Implementations§

Source§

impl ConnectionManager

Source

pub async fn recover_connection( &self, peer_id: &PeerId, ) -> Result<(), NetworkError>

Recovers from connection failures by attempting reconnection

Source

pub async fn health_check(&self) -> Result<Vec<PeerId>, NetworkError>

Performs health check on all active connections

Source

pub async fn auto_recover(&self) -> Result<usize, NetworkError>

Automatically recovers unhealthy connections

Source

pub fn new(max_connections: usize) -> Self

Creates a new connection manager with default pool timeout (5 minutes).

The manager initializes with optimized default settings:

  • 5 minute connection pool TTL
  • Lock-free concurrent connection tracking
  • Comprehensive metrics collection
  • Health monitoring and circuit breakers
§Arguments
  • max_connections - Maximum number of concurrent connections to maintain
§Performance Considerations
  • Choose max_connections based on system resources
  • Connection pooling reduces setup overhead
  • Metrics collection has minimal overhead
  • Health monitoring provides proactive issue detection
Source

pub fn with_pool_timeout(max_connections: usize, pool_timeout: Duration) -> Self

Creates a new connection manager with enhanced features and custom pool timeout.

Allows fine-tuning of connection pooling behavior:

  • Custom TTL for pooled connections
  • Connection reuse optimization
  • Resource usage control
  • Enhanced health monitoring
§Arguments
  • max_connections - Maximum number of concurrent connections
  • pool_timeout - Time-to-live for pooled connections
§Connection Pool Behavior
  • Connections are cached until timeout
  • Expired connections automatically cleaned up
  • Pool size limited by max_connections
  • Health-based connection scoring and prioritization
Source

pub async fn connect(&self, peer_id: PeerId) -> Result<(), NetworkError>

Initiates a connection to a peer with automatic pooling and reuse.

Enhanced connection establishment process:

  1. Check circuit breaker status
  2. Check pool for existing healthy connection
  3. Reuse if valid connection exists
  4. Create new connection if needed
  5. Apply connection limits and health checks
  6. Initialize health monitoring
§Arguments
  • peer_id - ID of the peer to connect to
§Connection Pooling
  • Reuses healthy connections when possible
  • Validates connection freshness and quality
  • Removes expired or unhealthy connections
  • Updates usage metrics and health scores
§Circuit Breaker Protection
  • Prevents connections to repeatedly failing peers
  • Implements exponential backoff
  • Automatic recovery testing
§Returns
  • Ok(()) - Connection established or reused
  • Err(_) - Connection failed or circuit breaker open
Source

pub fn update_status(&self, peer_id: PeerId, status: ConnectionStatus)

Updates connection status for a peer with lock-free atomic guarantees.

Enhanced status update process:

  1. Update connection info with new status
  2. Update health and quality metrics
  3. Atomic metrics update
  4. Circuit breaker state management
  5. Event logging and monitoring
§Arguments
  • peer_id - ID of the peer to update
  • status - New connection status
  • response_time - Optional response time for quality calculation
  • bytes_transferred - Optional bytes transferred for bandwidth tracking
§Thread Safety
  • Status updates are lock-free and atomic
  • Metrics updates use parking_lot for better performance
  • Safe for concurrent access with minimal contention
§Health Tracking

Updates connection health scores, quality metrics, and circuit breaker states to ensure optimal connection management.

Source

pub fn update_status_with_metrics( &self, peer_id: PeerId, status: ConnectionStatus, response_time: Option<Duration>, bytes_transferred: u64, )

Updates connection status with detailed performance metrics

Source

pub fn disconnect(&self, peer_id: &PeerId)

Disconnects from a peer with enhanced cleanup and health tracking

Source

pub fn connection_count(&self) -> usize

Returns connection count (lock-free)

Source

pub fn get_status(&self, peer_id: &PeerId) -> Option<ConnectionStatus>

Returns connection status for a peer (lock-free)

Source

pub fn get_connection_info(&self, peer_id: &PeerId) -> Option<ConnectionInfo>

Returns detailed connection information for a peer (lock-free)

Source

pub fn get_quality_score(&self, peer_id: &PeerId) -> Option<f64>

Get connection quality score for a peer

Source

pub fn get_circuit_breaker_state( &self, peer_id: &PeerId, ) -> Option<CircuitBreakerState>

Get circuit breaker state for a peer

Source

pub fn get_healthy_connections(&self) -> Vec<(PeerId, f64)>

Get all healthy connections sorted by quality score

Source

pub fn update_metrics(&self, messages_per_second: f64, avg_latency_ms: u64)

Updates network metrics with optimized locking

Source

pub fn get_queue_metrics(&self) -> QueueMetrics

Get current queue metrics

Source

pub fn get_latency_metrics(&self) -> LatencyMetrics

Get current latency metrics

Source

pub fn get_throughput_metrics(&self) -> ThroughputMetrics

Get current throughput metrics

Source

pub fn get_metrics(&self) -> NetworkMetrics

Returns current network metrics (optimized)

Source

pub async fn open_stream( &self, peer_id: PeerId, priority: Priority, ) -> Result<StreamId, NetworkError>

Enhanced API methods for production features Open a multiplexed stream on a connection

Source

pub async fn close_stream( &self, stream_id: StreamId, ) -> Result<(), NetworkError>

Close a multiplexed stream

Source

pub async fn send_stream_data( &self, stream_id: StreamId, data: Bytes, ) -> Result<(), NetworkError>

Send data on a specific stream

Source

pub async fn retry_connect(&self, peer_id: PeerId) -> Result<(), NetworkError>

Execute connection operation with retry logic

Source

pub async fn select_best_connection( &self, available_peers: &[PeerId], ) -> Option<PeerId>

Select best connection using load balancer

Source

pub async fn start_health_monitoring(&self, peer_id: PeerId)

Start health monitoring for a peer

Source

pub async fn check_connection_health( &self, peer_id: PeerId, ) -> Option<HealthCheckResult>

Perform health check on a connection

Source

pub async fn warm_connections( &self, peer_id: PeerId, ) -> Result<(), NetworkError>

Warm up connections for a peer

Source

pub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState

Get warming state for a peer

Source

pub async fn get_connection_statistics(&self) -> ConnectionStatistics

Get comprehensive connection statistics

Source

pub fn set_connection_limits(&mut self, limits: ConnectionLimits)

Configure connection limits

Source

pub fn get_connection_limits(&self) -> &ConnectionLimits

Get current connection limits

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<'a, T, E> AsTaggedExplicit<'a, E> for T
where T: 'a,

Source§

fn explicit(self, class: Class, tag: u32) -> TaggedParser<'a, Explicit, Self, E>

Source§

impl<'a, T, E> AsTaggedExplicit<'a, E> for T
where T: 'a,

Source§

fn explicit(self, class: Class, tag: u32) -> TaggedParser<'a, Explicit, Self, E>

Source§

impl<'a, T, E> AsTaggedImplicit<'a, E> for T
where T: 'a,

Source§

fn implicit( self, class: Class, constructed: bool, tag: u32, ) -> TaggedParser<'a, Implicit, Self, E>

Source§

impl<'a, T, E> AsTaggedImplicit<'a, E> for T
where T: 'a,

Source§

fn implicit( self, class: Class, constructed: bool, tag: u32, ) -> TaggedParser<'a, Implicit, Self, E>

Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,