#![ allow( clippy::missing_inline_in_public_items ) ]
mod private
{
use std::
{
collections ::{ HashMap, VecDeque },
sync ::{ Arc, Mutex },
time ::Instant,
};
use core::time::Duration;
use serde::{ Deserialize, Serialize };
use tokio::sync::{ mpsc, watch };
#[ derive( Debug, Clone, PartialEq, Serialize, Deserialize ) ]
pub enum WebSocketState
{
Connecting,
Connected,
Closing,
Closed,
Failed( String ),
}
#[ derive( Debug, Clone, PartialEq, Serialize, Deserialize ) ]
pub enum WebSocketMessage
{
Text( String ),
Binary( Vec< u8 > ),
Ping( Vec< u8 > ),
Pong( Vec< u8 > ),
Close( Option< String > ),
}
impl WebSocketMessage
{
#[ must_use ]
pub fn as_text( &self ) -> Option< &str >
{
match self
{
WebSocketMessage::Text( text ) => Some( text ),
_ => None,
}
}
#[ must_use ]
pub fn as_binary( &self ) -> Option< &[ u8 ] >
{
match self
{
WebSocketMessage::Binary( data ) => Some( data ),
_ => None,
}
}
#[ must_use ]
pub fn is_control( &self ) -> bool
{
matches!( self, WebSocketMessage::Ping( _ ) | WebSocketMessage::Pong( _ ) | WebSocketMessage::Close( _ ) )
}
#[ must_use ]
pub fn size( &self ) -> usize
{
match self
{
WebSocketMessage::Text( text ) => text.len(),
WebSocketMessage::Binary( data ) | WebSocketMessage::Ping( data ) | WebSocketMessage::Pong( data ) => data.len(),
WebSocketMessage::Close( reason ) => reason.as_ref().map_or( 0, String::len ),
}
}
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct WebSocketConfig
{
pub connect_timeout_ms : u64,
pub max_message_size : usize,
pub heartbeat_interval_ms : u64,
pub max_queue_size : usize,
pub enable_compression : bool,
pub max_reconnect_attempts : u32,
pub reconnect_delay_ms : u64,
}
impl Default for WebSocketConfig
{
fn default() -> Self
{
Self
{
connect_timeout_ms : 30000,
max_message_size : 16 * 1024 * 1024, heartbeat_interval_ms : 30000,
max_queue_size : 1000,
enable_compression : true,
max_reconnect_attempts : 3,
reconnect_delay_ms : 1000,
}
}
}
#[ derive( Debug ) ]
pub struct WebSocketConnection
{
pub id : String,
pub url : String,
pub state : WebSocketState,
pub config : WebSocketConfig,
pub connected_at : Option< Instant >,
pub last_activity : Instant,
message_queue : Arc< Mutex< VecDeque< WebSocketMessage > > >,
}
impl WebSocketConnection
{
#[ must_use ]
pub fn new( id : String, url : String, config : WebSocketConfig ) -> Self
{
Self
{
id,
url,
state : WebSocketState::Connecting,
config,
connected_at : None,
last_activity : Instant::now(),
message_queue : Arc::new( Mutex::new( VecDeque::new() ) ),
}
}
pub fn update_state( &mut self, state : WebSocketState )
{
self.state = state;
self.last_activity = Instant::now();
if matches!( self.state, WebSocketState::Connected )
{
self.connected_at = Some( Instant::now() );
}
}
#[ must_use ]
pub fn connection_duration( &self ) -> Option< Duration >
{
self.connected_at.map( | connected | connected.elapsed() )
}
#[ must_use ]
pub fn idle_duration( &self ) -> Duration
{
self.last_activity.elapsed()
}
#[ must_use ]
pub fn is_active( &self ) -> bool
{
matches!( self.state, WebSocketState::Connected )
}
pub fn queue_message( &self, message : WebSocketMessage ) -> Result< (), String >
{
let mut queue = self.message_queue.lock().unwrap();
if queue.len() >= self.config.max_queue_size
{
return Err( "Message queue is full".to_string() );
}
if message.size() > self.config.max_message_size
{
return Err( "Message exceeds maximum size".to_string() );
}
queue.push_back( message );
Ok( () )
}
#[ must_use ]
#[ inline ]
pub fn dequeue_message( &self ) -> Option< WebSocketMessage >
{
let mut queue = self.message_queue.lock().unwrap();
queue.pop_front()
}
#[ must_use ]
#[ inline ]
pub fn queue_size( &self ) -> usize
{
self.message_queue.lock().unwrap().len()
}
#[ inline ]
pub fn clear_queue( &self )
{
let mut queue = self.message_queue.lock().unwrap();
queue.clear();
}
}
#[ derive( Debug ) ]
pub struct WebSocketPool
{
connections : HashMap< String, WebSocketConnection >,
config : WebSocketPoolConfig,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct WebSocketPoolConfig
{
pub max_connections : usize,
pub idle_timeout_ms : u64,
pub cleanup_interval_ms : u64,
}
impl Default for WebSocketPoolConfig
{
fn default() -> Self
{
Self
{
max_connections : 100,
idle_timeout_ms : 300_000, cleanup_interval_ms : 60000, }
}
}
impl WebSocketPool
{
#[ must_use ]
#[ inline ]
pub fn new( config : WebSocketPoolConfig ) -> Self
{
Self
{
connections : HashMap::new(),
config,
}
}
#[ inline ]
pub fn add_connection( &mut self, connection : WebSocketConnection ) -> Result< (), String >
{
if self.connections.len() >= self.config.max_connections
{
return Err( "Connection pool is full".to_string() );
}
self.connections.insert( connection.id.clone(), connection );
Ok( () )
}
#[ must_use ]
#[ inline ]
pub fn get_connection( &self, id : &str ) -> Option< &WebSocketConnection >
{
self.connections.get( id )
}
pub fn get_connection_mut( &mut self, id : &str ) -> Option< &mut WebSocketConnection >
{
self.connections.get_mut( id )
}
pub fn remove_connection( &mut self, id : &str ) -> Option< WebSocketConnection >
{
self.connections.remove( id )
}
#[ must_use ]
#[ inline ]
pub fn connection_ids( &self ) -> Vec< String >
{
self.connections.keys().cloned().collect()
}
#[ must_use ]
pub fn active_connection_count( &self ) -> usize
{
self.connections.values().filter( | conn | conn.is_active() ).count()
}
pub fn cleanup_idle_connections( &mut self ) -> Vec< String >
{
let idle_timeout = Duration::from_millis( self.config.idle_timeout_ms );
let mut removed = Vec::new();
self.connections.retain( | id, conn |
{
if conn.idle_duration() > idle_timeout
{
removed.push( id.clone() );
false
}
else
{
true
}
});
removed
}
}
#[ derive( Debug, Clone ) ]
pub enum WebSocketEvent
{
Connected
{
connection_id : String,
},
Disconnected
{
connection_id : String,
reason : Option< String >,
},
MessageReceived
{
connection_id : String,
message : WebSocketMessage,
},
MessageSent
{
connection_id : String,
message : WebSocketMessage,
},
Error
{
connection_id : String,
error : String,
},
}
#[ derive( Debug ) ]
pub struct WebSocketStreamer;
impl WebSocketStreamer
{
#[ must_use ]
pub fn create_event_notifier() -> ( WebSocketEventSender, WebSocketEventReceiver )
{
let ( tx, rx ) = mpsc::unbounded_channel();
( WebSocketEventSender { sender : tx }, WebSocketEventReceiver { receiver : rx } )
}
#[ must_use ]
pub fn create_message_channel() -> ( WebSocketMessageSender, WebSocketMessageReceiver )
{
let ( tx, rx ) = mpsc::unbounded_channel();
( WebSocketMessageSender { sender : tx }, WebSocketMessageReceiver { receiver : rx } )
}
#[ must_use ]
pub fn create_state_watcher( initial_state : WebSocketState ) -> ( watch::Sender< WebSocketState >, watch::Receiver< WebSocketState > )
{
watch ::channel( initial_state )
}
pub fn validate_config( config : &WebSocketConfig ) -> Result< (), String >
{
if config.connect_timeout_ms == 0
{
return Err( "connect_timeout_ms must be greater than 0".to_string() );
}
if config.max_message_size == 0
{
return Err( "max_message_size must be greater than 0".to_string() );
}
if config.heartbeat_interval_ms == 0
{
return Err( "heartbeat_interval_ms must be greater than 0".to_string() );
}
if config.max_queue_size == 0
{
return Err( "max_queue_size must be greater than 0".to_string() );
}
Ok( () )
}
#[ must_use ]
pub fn create_heartbeat_timer( interval : Duration ) -> mpsc::UnboundedReceiver< Instant >
{
let ( tx, rx ) = mpsc::unbounded_channel();
tokio ::spawn( async move
{
let mut ticker = tokio::time::interval( interval );
loop
{
ticker.tick().await;
if tx.send( Instant::now() ).is_err()
{
break;
}
}
});
rx
}
#[ must_use ]
pub fn calculate_reconnect_delay( attempt : u32, base_delay_ms : u64, max_delay_ms : u64 ) -> Duration
{
let base_delay = Duration::from_millis( base_delay_ms );
let max_delay = Duration::from_millis( max_delay_ms );
let multiplier = 2_u64.saturating_pow( attempt );
let calculated_delay = base_delay.saturating_mul( u32::try_from( multiplier ).unwrap_or( u32::MAX ) );
core ::cmp::min( calculated_delay, max_delay )
}
#[ must_use ]
pub fn process_message_queue( connection : &WebSocketConnection, max_messages : usize ) -> Vec< WebSocketMessage >
{
let mut messages = Vec::new();
for _ in 0..max_messages
{
if let Some( message ) = connection.dequeue_message()
{
messages.push( message );
}
else
{
break;
}
}
messages
}
#[ must_use ]
pub fn connection_statistics( connection : &WebSocketConnection ) -> WebSocketConnectionStats
{
WebSocketConnectionStats
{
connection_id : connection.id.clone(),
state : connection.state.clone(),
connected_duration : connection.connection_duration(),
idle_duration : connection.idle_duration(),
queue_size : connection.queue_size(),
last_activity : connection.last_activity,
}
}
}
#[ derive( Debug, Clone ) ]
pub struct WebSocketConnectionStats
{
pub connection_id : String,
pub state : WebSocketState,
pub connected_duration : Option< Duration >,
pub idle_duration : Duration,
pub queue_size : usize,
pub last_activity : Instant,
}
#[ derive( Debug, Clone ) ]
pub struct WebSocketEventSender
{
sender : mpsc::UnboundedSender< WebSocketEvent >,
}
impl WebSocketEventSender
{
pub fn send_event( &self, event : WebSocketEvent ) -> Result< (), &'static str >
{
self.sender.send( event ).map_err( | _ | "Failed to send WebSocket event" )
}
pub fn send_connected( &self, connection_id : String ) -> Result< (), &'static str >
{
self.send_event( WebSocketEvent::Connected { connection_id } )
}
pub fn send_disconnected( &self, connection_id : String, reason : Option< String > ) -> Result< (), &'static str >
{
self.send_event( WebSocketEvent::Disconnected { connection_id, reason } )
}
pub fn send_message_received( &self, connection_id : String, message : WebSocketMessage ) -> Result< (), &'static str >
{
self.send_event( WebSocketEvent::MessageReceived { connection_id, message } )
}
pub fn send_error( &self, connection_id : String, error : String ) -> Result< (), &'static str >
{
self.send_event( WebSocketEvent::Error { connection_id, error } )
}
}
#[ derive( Debug ) ]
pub struct WebSocketEventReceiver
{
receiver : mpsc::UnboundedReceiver< WebSocketEvent >,
}
impl WebSocketEventReceiver
{
pub fn try_recv( &mut self ) -> Option< WebSocketEvent >
{
self.receiver.try_recv().ok()
}
pub async fn recv( &mut self ) -> Option< WebSocketEvent >
{
self.receiver.recv().await
}
}
#[ derive( Debug, Clone ) ]
pub struct WebSocketMessageSender
{
sender : mpsc::UnboundedSender< WebSocketMessage >,
}
impl WebSocketMessageSender
{
pub fn send_message( &self, message : WebSocketMessage ) -> Result< (), &'static str >
{
self.sender.send( message ).map_err( | _ | "Failed to send WebSocket message" )
}
pub fn send_text( &self, text : String ) -> Result< (), &'static str >
{
self.send_message( WebSocketMessage::Text( text ) )
}
pub fn send_binary( &self, data : Vec< u8 > ) -> Result< (), &'static str >
{
self.send_message( WebSocketMessage::Binary( data ) )
}
pub fn send_ping( &self, data : Vec< u8 > ) -> Result< (), &'static str >
{
self.send_message( WebSocketMessage::Ping( data ) )
}
}
#[ derive( Debug ) ]
pub struct WebSocketMessageReceiver
{
receiver : mpsc::UnboundedReceiver< WebSocketMessage >,
}
impl WebSocketMessageReceiver
{
pub fn try_recv( &mut self ) -> Option< WebSocketMessage >
{
self.receiver.try_recv().ok()
}
pub async fn recv( &mut self ) -> Option< WebSocketMessage >
{
self.receiver.recv().await
}
}
}
crate ::mod_interface!
{
exposed use private::WebSocketState;
exposed use private::WebSocketMessage;
exposed use private::WebSocketConfig;
exposed use private::WebSocketConnection;
exposed use private::WebSocketPool;
exposed use private::WebSocketPoolConfig;
exposed use private::WebSocketEvent;
exposed use private::WebSocketStreamer;
exposed use private::WebSocketConnectionStats;
exposed use private::WebSocketEventSender;
exposed use private::WebSocketEventReceiver;
exposed use private::WebSocketMessageSender;
exposed use private::WebSocketMessageReceiver;
}