use super::protocol::*;
use crate::error::Error;
use crate::models::websocket_streaming::*;
use std::collections::HashMap;
use std::sync::{ Arc, RwLock };
use std::sync::atomic::{ AtomicU64, AtomicBool, Ordering };
use std::time::Instant;
use tokio::sync::{ mpsc, broadcast };
#[ derive( Debug ) ]
pub struct WebSocketConnectionManager
{
connections : Arc< RwLock< HashMap< String, Arc< WebSocketStreamSession > > > >,
pool_config : WebSocketPoolConfig,
global_metrics : Arc< RwLock< WebSocketMetrics > >,
connection_counter : Arc< AtomicU64 >,
is_running : Arc< AtomicBool >,
}
#[ derive( Debug ) ]
pub struct WebSocketStreamSession
{
pub session_id : String,
state : Arc< RwLock< StreamSessionState > >,
connection : WebSocketConnection,
message_sender : mpsc::UnboundedSender< WebSocketStreamMessage >,
message_receiver : Arc< RwLock< Option< mpsc::UnboundedReceiver< WebSocketStreamMessage > > > >,
broadcast_sender : broadcast::Sender< WebSocketStreamMessage >,
config : WebSocketConfig,
metrics : Arc< RwLock< SessionMetrics > >,
created_at : Instant,
}
#[ derive( Debug ) ]
pub struct StreamController
{
session : Arc< WebSocketStreamSession >,
control_sender : mpsc::UnboundedSender< StreamControl >,
}
impl WebSocketConnectionManager
{
pub fn new( pool_config : WebSocketPoolConfig ) -> Self
{
Self {
connections : Arc::new( RwLock::new( HashMap::new() ) ),
pool_config,
global_metrics : Arc::new( RwLock::new( WebSocketMetrics::default() ) ),
connection_counter : Arc::new( AtomicU64::new( 0 ) ),
is_running : Arc::new( AtomicBool::new( false ) ),
}
}
pub async fn start( &self ) -> Result< (), Error >
{
self.is_running.store( true, Ordering::Relaxed );
Ok( () )
}
pub async fn stop( &self ) -> Result< (), Error >
{
self.is_running.store( false, Ordering::Relaxed );
if let Ok( mut connections ) = self.connections.write()
{
for ( _session_id, session ) in connections.drain()
{
let _ = session.close().await;
}
}
Ok( () )
}
pub async fn create_session( &self, endpoint : &str, config : WebSocketConfig ) -> Result< String, Error >
{
let session_id = format!( "ws_session_{}", self.connection_counter.fetch_add( 1, Ordering::Relaxed ) );
let connection = WebSocketConnection::connect( endpoint, config.clone() ).await?;
let ( message_sender, message_receiver ) = mpsc::unbounded_channel();
let ( broadcast_sender, _broadcast_receiver ) = broadcast::channel( 1000 );
let session = Arc::new( WebSocketStreamSession {
session_id : session_id.clone(),
state : Arc::new( RwLock::new( StreamSessionState::Initializing ) ),
connection,
message_sender,
message_receiver : Arc::new( RwLock::new( Some( message_receiver ) ) ),
broadcast_sender,
config,
metrics : Arc::new( RwLock::new( SessionMetrics::default() ) ),
created_at : Instant::now(),
} );
if let Ok( mut connections ) = self.connections.write()
{
connections.insert( session_id.clone(), session );
}
Ok( session_id )
}
pub fn get_session( &self, session_id : &str ) -> Option< Arc< WebSocketStreamSession > >
{
if let Ok( connections ) = self.connections.read()
{
connections.get( session_id ).cloned()
} else {
None
}
}
pub async fn remove_session( &self, session_id : &str ) -> Result< (), Error >
{
if let Ok( mut connections ) = self.connections.write()
{
if let Some( session ) = connections.remove( session_id )
{
session.close().await?;
}
}
Ok( () )
}
pub fn get_metrics( &self ) -> WebSocketMetrics
{
self.global_metrics.read().unwrap_or_else( | poisoned | poisoned.into_inner() ).clone()
}
pub fn list_sessions( &self ) -> Vec< String >
{
if let Ok( connections ) = self.connections.read()
{
connections.keys().cloned().collect()
} else {
Vec::new()
}
}
pub fn get_pool_config( &self ) -> &WebSocketPoolConfig
{
&self.pool_config
}
}
impl WebSocketStreamSession
{
pub async fn send_message( &self, message : WebSocketStreamMessage ) -> Result< (), Error >
{
self.message_sender.send( message )
.map_err( | e | Error::ServerError( format!( "Failed to send message : {}", e ) ) )?;
if let Ok( mut metrics ) = self.metrics.write()
{
metrics.messages_sent += 1;
metrics.last_activity = Some( std::time::SystemTime::now().duration_since( std::time::UNIX_EPOCH ).unwrap().as_secs() );
}
Ok( () )
}
pub fn subscribe( &self ) -> broadcast::Receiver< WebSocketStreamMessage >
{
self.broadcast_sender.subscribe()
}
pub fn get_state( &self ) -> StreamSessionState
{
self.state.read().unwrap_or_else( | poisoned | poisoned.into_inner() ).clone()
}
pub fn set_state( &self, new_state : StreamSessionState )
{
if let Ok( mut state ) = self.state.write()
{
*state = new_state;
}
}
pub fn get_metrics( &self ) -> SessionMetrics
{
self.metrics.read().unwrap_or_else( | poisoned | poisoned.into_inner() ).clone()
}
pub fn get_config( &self ) -> &WebSocketConfig
{
&self.config
}
pub fn get_created_at( &self ) -> Instant
{
self.created_at
}
pub fn get_message_receiver( &self ) -> &Arc< RwLock< Option< mpsc::UnboundedReceiver< WebSocketStreamMessage > > > >
{
&self.message_receiver
}
pub async fn close( &self ) -> Result< (), Error >
{
self.set_state( StreamSessionState::Terminated );
self.connection.close().await
}
}
impl StreamController
{
pub fn new( session : Arc< WebSocketStreamSession > ) -> Self
{
let ( control_sender, _control_receiver ) = mpsc::unbounded_channel();
Self {
session,
control_sender,
}
}
pub async fn start( &self ) -> Result< (), Error >
{
self.session.set_state( StreamSessionState::Active );
self.send_control( StreamControl::Start ).await
}
pub async fn pause( &self ) -> Result< (), Error >
{
self.session.set_state( StreamSessionState::Paused );
self.send_control( StreamControl::Pause ).await
}
pub async fn resume( &self ) -> Result< (), Error >
{
self.session.set_state( StreamSessionState::Active );
self.send_control( StreamControl::Resume ).await
}
pub async fn stop( &self ) -> Result< (), Error >
{
self.session.set_state( StreamSessionState::Terminated );
self.send_control( StreamControl::Stop ).await
}
pub async fn reset( &self ) -> Result< (), Error >
{
self.session.set_state( StreamSessionState::Initializing );
self.send_control( StreamControl::Reset ).await
}
async fn send_control( &self, command : StreamControl ) -> Result< (), Error >
{
self.control_sender.send( command.clone() )
.map_err( | e | Error::ServerError( format!( "Failed to send control command : {}", e ) ) )?;
let control_message = WebSocketStreamMessage::Control {
command,
metadata : None,
};
self.session.send_message( control_message ).await
}
pub fn get_session_metrics( &self ) -> SessionMetrics
{
self.session.get_metrics()
}
}