use api_openai::ClientApiAccessors;
#[ allow( unused_imports ) ]
use api_openai::
{
client ::Client,
error ::OpenAIError,
api ::realtime::{ RealtimeClient, ws::WsSession },
components ::realtime_shared::
{
RealtimeSessionCreateRequest,
RealtimeClientEventInputAudioBufferAppend,
RealtimeClientEventInputAudioBufferCommit,
RealtimeServerEvent,
RealtimeSessionTurnDetection,
RealtimeSessionInputAudioTranscription,
},
components ::common::ModelIds,
};
use tracing_subscriber::{ EnvFilter, fmt }; use base64::{ engine::general_purpose::STANDARD as base64_engine, Engine as _ }; use std::sync::{ Arc, Mutex };
use tokio::time::{ sleep, Duration }; use std::io::{ Write, stdout };
#[ tokio::main( flavor = "current_thread" ) ]
async fn main() -> Result< (), OpenAIError >
{
fmt()
.with_env_filter( EnvFilter::from_default_env().add_directive( "api_openai=trace".parse().unwrap() ) )
.init();
dotenv ::from_filename( "./secret/-secret.sh" ).ok();
tracing ::info!( "Initializing client..." );
let client = Client::new();
tracing ::info!( "Building realtime session request..." );
let request = RealtimeSessionCreateRequest::former()
.model( "gpt-4o-realtime-preview".to_string() )
.input_audio_format( "pcm16" )
.turn_detection
(
RealtimeSessionTurnDetection::former()
.r#type( "semantic_vad" )
.create_response( false )
.interrupt_response( true )
.form()
)
.input_audio_transcription( RealtimeSessionInputAudioTranscription::former().model( "whisper-1" ).form() )
.temperature( 0.7 )
.form();
tracing ::info!( "Sending request to OpenAI API to create session..." );
let session = client.realtime().create( request ).await?;
tracing ::info!( session_id = %session.id, "Session created." );
tracing ::info!( "Creating Realtime WebSocket Session Client..." );
let token = session.client_secret.value;
let session_client = WsSession::connect( client.environment().clone(), Some( &token ) ).await?;
tracing ::info!( "WebSocket client connected." );
let dummy_audio_bytes = include_bytes!("data/example.wav");
let audio_base64 = base64_engine.encode( &dummy_audio_bytes );
let append_event = RealtimeClientEventInputAudioBufferAppend::former()
.audio( audio_base64 )
.form();
tracing ::info!( "Sending input_audio_buffer.append event..." );
session_client.input_audio_buffer_append( append_event ).await?;
tracing ::info!( "Waiting after append..." );
sleep( Duration::from_millis( 3000 ) ).await;
tracing ::info!( "Audio append sent and waited." );
let client_event_id = "commit-example-id";
let commit_event = RealtimeClientEventInputAudioBufferCommit::former()
.event_id( client_event_id ) .form();
tracing ::info!( event_id = %client_event_id, "Sending input_audio_buffer.commit event..." );
session_client.input_audio_buffer_commit( commit_event ).await?;
tracing ::info!( "Waiting for input_audio_buffer.committed and conversation.item.created confirmation..." );
let mut commit_confirmed = false;
let expected_item_id_from_commit = Arc::new( Mutex::new( None::< String > ) ); let expected_item_id_clone = expected_item_id_from_commit.clone();
let mut item_created_confirmed = false;
let loop_timeout = Duration::from_secs( 15 ); let loop_start = tokio::time::Instant::now();
loop
{
if loop_start.elapsed() > loop_timeout
{
eprintln!("Timeout waiting for commit/create confirmations.");
return Err( OpenAIError::WsInvalidMessage(
format!( "Timeout waiting for commit/create confirmations (commit_confirmed : {}, item_created_confirmed : {})",
commit_confirmed, item_created_confirmed
)));
}
let read_timeout = Duration::from_millis(500);
match tokio::time::timeout( read_timeout, session_client.read_event() ).await
{
Ok( response_result ) => {
match response_result
{
Ok( Some( event ) ) => {
match event
{
RealtimeServerEvent::InputAudioBufferCommitted( committed_event ) =>
{
println!( "\n--- Commit Confirmation Received ---" );
println!( "{:?}", committed_event ); let item_id = committed_event.item_id;
println!( "Successfully received input_audio_buffer.committed. User item ID expected : {}", item_id );
*expected_item_id_clone.lock().unwrap() = Some( item_id ); commit_confirmed = true;
if item_created_confirmed { break; } }
RealtimeServerEvent::ConversationItemCreated( created_event ) =>
{
println!( "\n--- Conversation Item Created Received ---" );
let maybe_expected_id = expected_item_id_from_commit.lock().unwrap().clone();
if let Some( expected_id ) = maybe_expected_id
{
if created_event.item.id.as_deref() == Some( expected_id.as_str() )
{
println!( "Successfully received conversation.item.created matching committed item (ID: {}).", expected_id);
item_created_confirmed = true;
if commit_confirmed { break; } }
else
{
println!( "Received conversation.item.created for a different item ID: {:?}, expected : {}", created_event.item.id, expected_id );
}
}
else
{
if created_event.item.role.as_deref() == Some("user")
{
println!( "Received user conversation.item.created before commit provided expected ID.");
if commit_confirmed
{
item_created_confirmed = true;
break;
}
}
else
{
println!("Received unexpected non-user item created : {:?}", created_event.item.role);
}
}
}
RealtimeServerEvent::ConversationItemInputAudioTranscriptionDelta( delta ) =>
{
print!("{}", delta.delta); let _ = stdout().flush(); }
RealtimeServerEvent::ConversationItemInputAudioTranscriptionCompleted( completed ) =>
{
println!("\n--- Transcription Completed ---");
println!("{completed:?}");
}
RealtimeServerEvent::Error( error_event ) =>
{
eprintln!( "\n--- Received Server Error Event ---" );
println!( "{error_event:?}" );
if error_event.error.event_id.as_deref() == Some(&client_event_id)
{
eprintln!("Server error explicitly linked to our commit request (event_id : {}).", client_event_id);
return Err(OpenAIError::WsInvalidMessage
(format!("Commit failed : type={}, code={:?}, message={}",
error_event.error.r#type, error_event.error.code, error_event.error.message
)));
} else if error_event.error.message.to_lowercase().contains("commit")
{
eprintln!("Server error message mentions 'commit'.");
return Err( OpenAIError::WsInvalidMessage
(format!("Commit likely failed : type={}, code={:?}, message={}",
error_event.error.r#type, error_event.error.code, error_event.error.message
)));
}
}
_ => { println!( "\n--- Received Other Event --- \n{event:?}" ); }
}
}
Ok( None ) => {
println!( "\nWebSocket connection closed by server." );
break; }
Err( e ) => {
eprintln!( "\nError reading from WebSocket : {:?}", e );
return Err( e ); }
}
}
Err( _elapsed ) => {
continue;
}
}
}
if !commit_confirmed || !item_created_confirmed
{
eprintln!("Loop finished without receiving full confirmation (commit : {}, item_created : {}).", commit_confirmed, item_created_confirmed);
if session_client.read_event().await.is_ok() { return Err( OpenAIError::WsInvalidMessage( "Did not receive expected commit/create confirmations".to_string() ) );
}
else
{
return Err( OpenAIError::WsConnectionClosed ); }
}
println!( "\nCommit and item creation successfully confirmed." );
Ok( () )
}