mod private
{
use crate::
{
client ::Client,
error ::{ OpenAIError, Result },
environment ::{ OpenaiEnvironment, EnvironmentInterface },
};
use crate::components::realtime_shared:: {
RealtimeClientEvent,
RealtimeServerEvent,
RealtimeSession,
RealtimeSessionCreateRequest,
RealtimeSessionCreateResponse,
RealtimeTranscriptionSessionCreateRequest,
RealtimeTranscriptionSessionCreateResponse,
};
use tokio::sync::mpsc;
use tokio::
{
net ::TcpStream,
sync ::{ Mutex }, };
use tokio_tungstenite::{ MaybeTlsStream, WebSocketStream };
use futures_util::
{
StreamExt as _, SinkExt as _, };
use serde_json;
use std::sync::Arc;
#[ derive( Debug, Clone ) ]
pub struct Realtime< 'client, E >
where
E : OpenaiEnvironment + EnvironmentInterface + Send + Sync + 'static,
{
client : &'client Client< E >,
}
impl< 'client, E > Realtime< 'client, E >
where
E : OpenaiEnvironment + EnvironmentInterface + Send + Sync + 'static,
{
pub(crate) fn new( client : &'client Client< E > ) -> Self
{
Self { client }
}
#[ inline ]
pub async fn create_session( &self, request : RealtimeSessionCreateRequest ) -> Result< RealtimeSessionCreateResponse >
{
self.client.post( "realtime/sessions", &request ).await
}
#[ inline ]
pub async fn retrieve_session( &self, session_id : &str ) -> Result< RealtimeSession >
{
let path = format!( "/realtime/sessions/{session_id}" );
self.client.get( &path ).await
}
#[ inline ]
pub async fn update_session( &self, session_id : &str, request : serde_json::Value ) -> Result< RealtimeSession >
{
let path = format!( "/realtime/sessions/{session_id}" );
self.client.post( &path, &request ).await
}
#[ inline ]
pub async fn delete_session( &self, session_id : &str ) -> Result< serde_json::Value >
{
let path = format!( "/realtime/sessions/{session_id}" );
self.client.delete( &path ).await
}
#[ inline ]
pub async fn create_transcription_session( &self, request : RealtimeTranscriptionSessionCreateRequest ) -> Result< RealtimeTranscriptionSessionCreateResponse >
{
self.client.post( "realtime/transcription_sessions", &request ).await
}
#[ inline ]
pub async fn update_transcription_session( &self, session_id : &str, request : serde_json::Value ) -> Result< RealtimeTranscriptionSessionCreateResponse >
{
let path = format!( "/realtime/transcription_sessions/{session_id}" );
self.client.post( &path, &request ).await
}
#[ inline ]
pub async fn delete_transcription_session( &self, session_id : &str ) -> Result< serde_json::Value >
{
let path = format!( "/realtime/transcription_sessions/{session_id}" );
self.client.delete( &path ).await
}
#[ inline ]
pub async fn connect_ws( &self, session_id : &str ) -> Result< WsSession >
{
let url = self.client.environment.join_realtime_base_url( &format!( "sessions/{session_id}/events" ) )?;
WsSession::connect( url.as_str() ).await
}
}
#[ derive( Debug ) ]
pub enum HandlerMessage
{
Message( String ),
Error( OpenAIError ),
Closed,
}
#[ derive( Debug, Clone ) ]
pub struct WsSession
{
pub rx : Arc< tokio::sync::Mutex< tokio::sync::mpsc::Receiver< HandlerMessage > > >, pub tx : Arc< tokio::sync::mpsc::Sender< HandlerMessage > >,
pub ws_stream : Arc< tokio::sync::Mutex< WebSocketStream< MaybeTlsStream< TcpStream > > > >,
}
impl WsSession
{
#[ inline ]
pub async fn connect( url : &str ) -> Result< Self >
{
let ( ws_stream, _ ) = tokio_tungstenite::connect_async( url )
.await
.map_err( | e | OpenAIError::Ws( e.to_string() ) )?;
let ws_stream_arc = Arc::new( tokio::sync::Mutex::new( ws_stream ) );
let ( tx, rx ) = mpsc::channel( 100 );
let rx_arc = Arc::new( Mutex::new( rx ) ); let tx_arc = Arc::new( tx );
let ws_stream_locked = Arc::< _ >::clone( &ws_stream_arc );
let tx_clone = Arc::< _ >::clone( &tx_arc );
tokio ::spawn( async move
{
let mut ws_stream_locked = ws_stream_locked.lock().await;
loop
{
tokio ::select!
{
msg = ws_stream_locked.next() =>
{
match msg
{
Some( Ok( msg ) ) =>
{
if msg.is_text()
{
let message = msg.to_string();
let _ = tx_clone.send( HandlerMessage::Message( message ) ).await.ok();
}
},
Some( Err( error ) ) =>
{
let _ = tx_clone.send( HandlerMessage::Error( OpenAIError::Ws( error.to_string() ) ) ).await.ok(); break;
},
None =>
{
let _ = tx_clone.send( HandlerMessage::Closed ).await.ok();
break;
},
}
},
_unit = tokio::time::sleep( tokio::time::Duration::from_secs( 1 ) ) =>
{
}
}
}
});
Ok( Self
{
ws_stream : ws_stream_arc,
tx : tx_arc, rx : rx_arc,
})
}
#[ inline ]
pub async fn send_event( &self, event : RealtimeClientEvent ) -> Result< () >
{
let message = serde_json::to_string( &event )
.map_err( | e | OpenAIError::Internal( format!( "Serialization error : {e}" ) ) )?;
let mut ws_stream_locked = self.ws_stream.lock().await;
ws_stream_locked.send( tokio_tungstenite::tungstenite::Message::Text( message.into() ) ) .await
.map_err( | e | OpenAIError::Ws( e.to_string() ) )?; Ok( () )
}
#[ inline ]
pub async fn recv_event( &self ) -> Result< RealtimeServerEvent >
{
match self.rx.lock().await.recv().await {
Some( HandlerMessage::Message( message ) ) =>
{
serde_json ::from_str( &message )
.map_err( | e | OpenAIError::Internal( format!( "Deserialization error : {e}" ) ).into() )
},
Some( HandlerMessage::Error( error ) ) => Err( error.into() ),
Some( HandlerMessage::Closed ) | None => Err( OpenAIError::Ws( tokio_tungstenite::tungstenite::Error::ConnectionClosed.to_string() ).into() ), }
}
}
}
crate ::mod_interface!
{
exposed use
{
Realtime,
WsSession,
HandlerMessage,
};
}