#![ allow( dead_code, missing_debug_implementations, async_fn_in_trait, missing_docs ) ]
mod private
{
use serde::{ Deserialize, Serialize };
use std::collections::{ HashMap, VecDeque };
use std::sync::{ Arc, RwLock };
use core::sync::atomic::{ AtomicU64, AtomicBool, AtomicUsize, Ordering };
use core::time::Duration;
use std::time::Instant;
use tokio::sync::{ mpsc, Semaphore };
use tokio::time::sleep;
pub use crate::models::websocket_streaming::
{
WebSocketConnectionState, WebSocketConfig, WebSocketMessage
};
pub trait ConnectionPool : Send + Sync
{
async fn get_connection( &self, endpoint : &str ) -> Result< Arc< OptimizedWebSocketConnection >, crate::error::Error >;
async fn return_connection( &self, connection : Arc< OptimizedWebSocketConnection > ) -> Result< (), crate::error::Error >;
fn get_stats( &self ) -> ConnectionPoolStats;
async fn cleanup( &self ) -> Result< usize, crate::error::Error >;
}
#[ derive( Debug, Clone ) ]
pub enum MessageSerializerType
{
BinaryJson { enable_compression : bool, compression_level : u8 },
MessagePack { enable_compression : bool },
Json,
}
impl MessageSerializerType
{
pub fn serialize< T >( &self, message : &T ) -> Result< Vec< u8 >, crate::error::Error >
where
T: Serialize,
{
match self
{
Self::BinaryJson { enable_compression, .. } => {
let json_bytes = serde_json::to_vec( message )
.map_err( | e | crate::error::Error::SerializationError( e.to_string() ) )?;
if *enable_compression
{
Ok( json_bytes )
} else {
Ok( json_bytes )
}
},
Self::MessagePack { .. } => {
serde_json ::to_vec( message )
.map_err( | e | crate::error::Error::SerializationError( e.to_string() ) )
},
Self::Json => {
serde_json ::to_vec( message )
.map_err( | e | crate::error::Error::SerializationError( e.to_string() ) )
},
}
}
pub fn deserialize< T >( &self, data : &[ u8 ] ) -> Result< T, crate::error::Error >
where
T: for< 'de > Deserialize< 'de >,
{
match self
{
Self::BinaryJson { .. } | Self::MessagePack { .. } | Self::Json => {
serde_json ::from_slice( data )
.map_err( | e | crate::error::Error::DeserializationError( e.to_string() ) )
},
}
}
#[ inline ]
#[ must_use ]
pub fn format_id( &self ) -> &'static str
{
match self
{
Self::BinaryJson { .. } => "binary_json",
Self::MessagePack { .. } => "messagepack",
Self::Json => "json",
}
}
#[ inline ]
#[ must_use ]
pub fn supports_compression( &self ) -> bool
{
match self
{
Self::BinaryJson { enable_compression, .. } | Self::MessagePack { enable_compression } => *enable_compression,
Self::Json => false,
}
}
}
#[ derive( Debug, Clone, Serialize, Deserialize, PartialEq ) ]
pub struct ConnectionPoolStats
{
pub total_connections : usize,
pub active_connections : usize,
pub idle_connections : usize,
pub connections_created : u64,
pub connections_reused : u64,
pub hit_ratio : f64,
pub avg_connection_age_seconds : f64,
}
#[ derive( Debug, Clone, Serialize, Deserialize, PartialEq ) ]
pub struct OptimizedWebSocketConfig
{
pub base : WebSocketConfig,
pub pool_config : ConnectionPoolConfig,
pub message_config : MessageOptimizationConfig,
pub monitoring_config : WebSocketMonitoringConfig,
pub resource_config : ResourceManagementConfig,
}
#[ derive( Debug, Clone, Serialize, Deserialize, PartialEq, Default ) ]
pub struct ConnectionPoolConfig
{
pub max_connections_per_endpoint : usize,
pub max_total_connections : usize,
pub max_idle_time_seconds : u64,
pub cleanup_interval_seconds : u64,
pub enable_connection_warming : bool,
pub min_connections_per_endpoint : usize,
}
#[ derive( Debug, Clone, Serialize, Deserialize, PartialEq, Default ) ]
pub struct MessageOptimizationConfig
{
pub serialization_format : SerializationFormat,
pub enable_compression : bool,
pub compression_level : u8,
pub enable_batching : bool,
pub max_batch_size : usize,
pub max_batch_delay_ms : u64,
}
#[ derive( Debug, Clone, Serialize, Deserialize, PartialEq ) ]
pub struct WebSocketMonitoringConfig
{
pub enable_metrics : bool,
pub metrics_interval_seconds : u64,
pub enable_lifecycle_logging : bool,
pub enable_message_tracing : bool,
pub max_metrics_history : usize,
}
#[ derive( Debug, Clone, Serialize, Deserialize, PartialEq ) ]
pub struct ResourceManagementConfig
{
pub max_concurrent_connections : usize,
pub memory_limit_bytes : u64,
pub enable_auto_scaling : bool,
pub cpu_threshold : f64,
pub memory_threshold : f64,
}
#[ derive( Debug, Clone, Serialize, Deserialize, PartialEq, Default ) ]
pub enum SerializationFormat
{
#[ default ]
Json,
BinaryJson,
MessagePack,
ProtocolBuffers,
CustomBinary,
}
impl Default for OptimizedWebSocketConfig
{
fn default() -> Self
{
Self {
base : WebSocketConfig::default(),
pool_config : ConnectionPoolConfig {
max_connections_per_endpoint : 10,
max_total_connections : 100,
max_idle_time_seconds : 300, cleanup_interval_seconds : 60, enable_connection_warming : true,
min_connections_per_endpoint : 2,
},
message_config : MessageOptimizationConfig {
serialization_format : SerializationFormat::BinaryJson,
enable_compression : true,
compression_level : 6,
enable_batching : true,
max_batch_size : 10,
max_batch_delay_ms : 100,
},
monitoring_config : WebSocketMonitoringConfig {
enable_metrics : true,
metrics_interval_seconds : 60,
enable_lifecycle_logging : true,
enable_message_tracing : false, max_metrics_history : 1000,
},
resource_config : ResourceManagementConfig {
max_concurrent_connections : 1000,
memory_limit_bytes : 1024 * 1024 * 512, enable_auto_scaling : true,
cpu_threshold : 0.8,
memory_threshold : 0.8,
},
}
}
}
#[ derive( Debug ) ]
pub struct OptimizedConnectionPool
{
pools : Arc< RwLock< HashMap< String, VecDeque< PooledConnection > > > >,
config : ConnectionPoolConfig,
stats : Arc< RwLock< ConnectionPoolStats > >,
active_count : Arc< AtomicUsize >,
created_count : Arc< AtomicU64 >,
reused_count : Arc< AtomicU64 >,
connection_semaphore : Arc< Semaphore >,
cleanup_running : Arc< AtomicBool >,
}
#[ derive( Debug ) ]
struct PooledConnection
{
connection : Arc< OptimizedWebSocketConnection >,
last_used : Instant,
created_at : Instant,
usage_count : u64,
}
#[ derive( Debug ) ]
pub struct OptimizedWebSocketConnection
{
pub id : String,
pub endpoint : String,
state : Arc< RwLock< WebSocketConnectionState > >,
config : OptimizedWebSocketConfig,
serializer : MessageSerializerType,
metrics : Arc< RwLock< ConnectionMetrics > >,
message_sender : Option< mpsc::UnboundedSender< WebSocketMessage > >,
health_checker : Arc< ConnectionHealthChecker >,
}
#[ derive( Debug, Clone, Serialize, Deserialize, PartialEq ) ]
pub struct ConnectionMetrics
{
pub messages_sent : u64,
pub messages_received : u64,
pub bytes_sent : u64,
pub bytes_received : u64,
pub avg_latency_ms : f64,
pub uptime_seconds : u64,
pub reconnection_count : u32,
pub last_error : Option< String >,
pub throughput_msg_per_sec : f64,
pub bandwidth_bytes_per_sec : f64,
}
#[ derive( Debug ) ]
pub struct ConnectionHealthChecker
{
last_heartbeat : Arc< RwLock< Option< Instant > > >,
check_interval : Duration,
failed_checks : Arc< AtomicU64 >,
max_failed_checks : u64,
is_healthy : Arc< AtomicBool >,
}
impl OptimizedConnectionPool
{
#[ inline ]
#[ must_use ]
pub fn new( config : ConnectionPoolConfig ) -> Self
{
let max_connections = config.max_total_connections;
Self {
pools : Arc::new( RwLock::new( HashMap::new() ) ),
config,
stats : Arc::new( RwLock::new( ConnectionPoolStats {
total_connections : 0,
active_connections : 0,
idle_connections : 0,
connections_created : 0,
connections_reused : 0,
hit_ratio : 0.0,
avg_connection_age_seconds : 0.0,
} ) ),
active_count : Arc::new( AtomicUsize::new( 0 ) ),
created_count : Arc::new( AtomicU64::new( 0 ) ),
reused_count : Arc::new( AtomicU64::new( 0 ) ),
connection_semaphore : Arc::new( Semaphore::new( max_connections ) ),
cleanup_running : Arc::new( AtomicBool::new( false ) ),
}
}
#[ inline ]
#[ must_use ]
pub fn get_stats( &self ) -> ConnectionPoolStats
{
let mut stats = self.stats.read().unwrap_or_else( | poisoned | poisoned.into_inner() ).clone();
stats.active_connections = self.active_count.load( Ordering::Relaxed );
stats.connections_created = self.created_count.load( Ordering::Relaxed );
stats.connections_reused = self.reused_count.load( Ordering::Relaxed );
if let Ok( pools_lock ) = self.pools.read()
{
let now = Instant::now();
let mut total_age_seconds = 0.0;
let mut connection_count = 0;
for pool in pools_lock.values()
{
for conn in pool
{
total_age_seconds += now.duration_since( conn.created_at ).as_secs_f64();
connection_count += 1;
}
}
if connection_count > 0
{
stats.avg_connection_age_seconds = total_age_seconds / connection_count as f64;
}
stats.idle_connections = connection_count;
}
let total_requests = stats.connections_created + stats.connections_reused;
if total_requests > 0
{
stats.hit_ratio = stats.connections_reused as f64 / total_requests as f64;
}
stats
}
pub async fn start_cleanup_task( &self )
{
if self.cleanup_running.swap( true, Ordering::Relaxed )
{
return; }
let pools = Arc::downgrade( &self.pools );
let cleanup_interval = Duration::from_secs( self.config.cleanup_interval_seconds );
let max_idle_time = Duration::from_secs( self.config.max_idle_time_seconds );
let cleanup_running = Arc::downgrade( &self.cleanup_running );
tokio ::spawn( async move {
while let ( Some( pools ), Some( cleanup_running ) ) = ( pools.upgrade(), cleanup_running.upgrade() )
{
if !cleanup_running.load( Ordering::Relaxed )
{
break;
}
if let Ok( mut pools_lock ) = pools.write()
{
let now = Instant::now();
for ( _endpoint, pool ) in pools_lock.iter_mut()
{
pool.retain( | conn | {
now.duration_since( conn.last_used ) < max_idle_time
} );
}
}
sleep( cleanup_interval ).await;
}
} );
}
}
impl ConnectionPool for OptimizedConnectionPool
{
async fn get_connection( &self, endpoint : &str ) -> Result< Arc< OptimizedWebSocketConnection >, crate::error::Error >
{
let _permit = self.connection_semaphore.try_acquire()
.map_err( | _ | crate::error::Error::ServerError( "Connection pool exhausted".to_string() ) )?;
if let Ok( mut pools_lock ) = self.pools.write()
{
if let Some( pool ) = pools_lock.get_mut( endpoint )
{
if let Some( mut pooled_conn ) = pool.pop_front()
{
pooled_conn.last_used = Instant::now();
pooled_conn.usage_count += 1;
self.reused_count.fetch_add( 1, Ordering::Relaxed );
self.active_count.fetch_add( 1, Ordering::Relaxed );
return Ok( pooled_conn.connection );
}
}
}
let connection = Arc::new( OptimizedWebSocketConnection::new(
endpoint,
OptimizedWebSocketConfig::default()
).await? );
self.created_count.fetch_add( 1, Ordering::Relaxed );
self.active_count.fetch_add( 1, Ordering::Relaxed );
Ok( connection )
}
async fn return_connection( &self, connection : Arc< OptimizedWebSocketConnection > ) -> Result< (), crate::error::Error >
{
self.active_count.fetch_sub( 1, Ordering::Relaxed );
if !connection.health_checker.is_healthy.load( Ordering::Relaxed )
{
return Ok( () ); }
if let Ok( mut pools_lock ) = self.pools.write()
{
let pool = pools_lock.entry( connection.endpoint.clone() ).or_insert_with( VecDeque::new );
if pool.len() < self.config.max_connections_per_endpoint
{
let pooled_conn = PooledConnection {
connection,
last_used : Instant::now(),
created_at : Instant::now(),
usage_count : 1,
};
pool.push_back( pooled_conn );
}
}
Ok( () )
}
fn get_stats( &self ) -> ConnectionPoolStats
{
let total_created = self.created_count.load( Ordering::Relaxed );
let total_reused = self.reused_count.load( Ordering::Relaxed );
let hit_ratio = if total_created + total_reused > 0
{
total_reused as f64 / ( total_created + total_reused ) as f64
} else {
0.0
};
let pools_lock = self.pools.read().unwrap_or_else( | poisoned | poisoned.into_inner() );
let total_connections = pools_lock.values().map( | pool | pool.len() ).sum::< usize >();
ConnectionPoolStats {
total_connections,
active_connections : self.active_count.load( Ordering::Relaxed ),
idle_connections : total_connections,
connections_created : total_created,
connections_reused : total_reused,
hit_ratio,
avg_connection_age_seconds : 0.0, }
}
async fn cleanup( &self ) -> Result< usize, crate::error::Error >
{
let mut cleaned_count = 0;
let now = Instant::now();
let max_idle_time = Duration::from_secs( self.config.max_idle_time_seconds );
if let Ok( mut pools_lock ) = self.pools.write()
{
for ( _endpoint, pool ) in pools_lock.iter_mut()
{
let initial_size = pool.len();
pool.retain( | conn | {
now.duration_since( conn.last_used ) < max_idle_time
} );
cleaned_count += initial_size - pool.len();
}
}
Ok( cleaned_count )
}
}
impl OptimizedWebSocketConnection
{
pub async fn new( endpoint : &str, config : OptimizedWebSocketConfig ) -> Result< Self, crate::error::Error >
{
let id = format!( "ws_opt_{}", uuid::Uuid::new_v4() );
let serializer = match config.message_config.serialization_format
{
SerializationFormat::BinaryJson => MessageSerializerType::BinaryJson {
enable_compression : config.message_config.enable_compression,
compression_level : config.message_config.compression_level,
},
SerializationFormat::MessagePack => MessageSerializerType::MessagePack {
enable_compression : config.message_config.enable_compression,
},
_ => MessageSerializerType::Json,
};
Ok( Self {
id,
endpoint : endpoint.to_string(),
state : Arc::new( RwLock::new( WebSocketConnectionState::Connecting ) ),
config,
serializer,
metrics : Arc::new( RwLock::new( ConnectionMetrics {
messages_sent : 0,
messages_received : 0,
bytes_sent : 0,
bytes_received : 0,
avg_latency_ms : 0.0,
uptime_seconds : 0,
reconnection_count : 0,
last_error : None,
throughput_msg_per_sec : 0.0,
bandwidth_bytes_per_sec : 0.0,
} ) ),
message_sender : None,
health_checker : Arc::new( ConnectionHealthChecker {
last_heartbeat : Arc::new( RwLock::new( Some( Instant::now() ) ) ),
check_interval : Duration::from_secs( 30 ),
failed_checks : Arc::new( AtomicU64::new( 0 ) ),
max_failed_checks : 3,
is_healthy : Arc::new( AtomicBool::new( true ) ),
} ),
} )
}
pub async fn send_message< T >( &self, message : &T ) -> Result< (), crate::error::Error >
where
T: Serialize,
{
let serialized = self.serializer.serialize( message )?;
if let Ok( mut metrics ) = self.metrics.write()
{
metrics.messages_sent += 1;
metrics.bytes_sent += serialized.len() as u64;
}
Ok( () )
}
pub fn get_metrics( &self ) -> ConnectionMetrics
{
self.metrics.read().unwrap_or_else( | poisoned | poisoned.into_inner() ).clone()
}
}
pub struct OptimizedWebSocketStreamingApi< 'a >
{
client : &'a crate::client::Client,
pool : Arc< OptimizedConnectionPool >,
config : OptimizedWebSocketConfig,
metrics : Arc< RwLock< StreamingMetrics > >,
}
#[ derive( Debug, Clone, Serialize, Deserialize, PartialEq ) ]
pub struct StreamingMetrics
{
pub total_sessions : u64,
pub active_sessions : u64,
pub avg_session_duration_seconds : f64,
pub total_messages_streamed : u64,
pub avg_throughput_msg_per_sec : f64,
pub total_bytes_transferred : u64,
pub pool_hit_ratio : f64,
pub error_rate : f64,
}
impl< 'a > OptimizedWebSocketStreamingApi< 'a >
{
pub fn new( client : &'a crate::client::Client ) -> Self
{
Self::with_config( client, OptimizedWebSocketConfig::default() )
}
pub fn with_config( client : &'a crate::client::Client, config : OptimizedWebSocketConfig ) -> Self
{
let pool = Arc::new(
OptimizedConnectionPool::new( config.pool_config.clone() )
);
Self {
client,
pool,
config,
metrics : Arc::new( RwLock::new( StreamingMetrics {
total_sessions : 0,
active_sessions : 0,
avg_session_duration_seconds : 0.0,
total_messages_streamed : 0,
avg_throughput_msg_per_sec : 0.0,
total_bytes_transferred : 0,
pool_hit_ratio : 0.0,
error_rate : 0.0,
} ) ),
}
}
pub fn get_metrics( &self ) -> StreamingMetrics
{
self.metrics.read().unwrap_or_else( | poisoned | poisoned.into_inner() ).clone()
}
pub fn get_pool_stats( &self ) -> ConnectionPoolStats
{
self.pool.get_stats()
}
pub async fn cleanup_connections( &self ) -> Result< usize, crate::error::Error >
{
self.pool.cleanup().await
}
}
impl< 'a > std::fmt::Debug for OptimizedWebSocketStreamingApi< 'a >
{
fn fmt( &self, f : &mut std::fmt::Formatter< '_ > ) -> std::fmt::Result
{
f.debug_struct( "OptimizedWebSocketStreamingApi" )
.field( "config", &self.config )
.finish_non_exhaustive()
}
}
impl Default for StreamingMetrics
{
fn default() -> Self
{
Self {
total_sessions : 0,
active_sessions : 0,
avg_session_duration_seconds : 0.0,
total_messages_streamed : 0,
avg_throughput_msg_per_sec : 0.0,
total_bytes_transferred : 0,
pool_hit_ratio : 0.0,
error_rate : 0.0,
}
}
}
}
pub use private::
{
ConnectionPool, MessageSerializerType, ConnectionPoolStats,
OptimizedWebSocketConfig, ConnectionPoolConfig, MessageOptimizationConfig,
WebSocketMonitoringConfig, ResourceManagementConfig, SerializationFormat,
OptimizedConnectionPool, OptimizedWebSocketConnection, ConnectionMetrics,
ConnectionHealthChecker, OptimizedWebSocketStreamingApi, StreamingMetrics
};