mod private
{
use crate::
{
client ::Client,
environment ::{ OpenaiEnvironment, EnvironmentInterface },
error ::{ OpenAIError, Result },
diagnostics ::{ RequestMetrics, ResponseMetrics },
};
use reqwest::Method;
use serde::{ de::DeserializeOwned, Serialize };
use futures_util::StreamExt;
use tokio::sync::mpsc;
use std::{ sync::Arc, time::Instant };
use eventsource_stream::Eventsource;
impl< E > Client< E >
where
E : OpenaiEnvironment + EnvironmentInterface + Send + Sync + 'static,
{
#[ allow( clippy::unused_async ) ]
#[ inline ]
pub(in crate) async fn post_stream< I, O >( &self, path : &str, body : &I ) -> Result< mpsc::Receiver< Result< O > > >
where
I : Serialize,
O : DeserializeOwned + Send + 'static, {
let url = self.environment.join_base_url( path )?;
let request = self.http_client.request( Method::POST, url ).json( body );
let ( tx, rx ) = mpsc::channel( 100 );
let tx_arc = Arc::new( tx );
tokio ::spawn( async move
{
let tx_clone = Arc::< _ >::clone( &tx_arc ); let response = match request.send().await
{
Ok( res ) => res,
Err( e ) =>
{
let _ = tx_clone.send( Err( OpenAIError::Stream( e.to_string() ).into() ) ).await;
return;
}
};
let mut event_stream = response.bytes_stream().eventsource();
while let Some( event_result ) = event_stream.next().await
{
match event_result
{
Ok( event ) =>
{
let data = &event.data;
if !data.is_empty()
{
if data == "[DONE]"
{
return; }
match serde_json::from_str::< O >( data )
{
Ok( obj ) =>
{
let _ = tx_clone.send( Ok( obj ) ).await;
},
Err( e ) =>
{
let error_msg = format!( "Failed to parse JSON from SSE data '{data}': {e}" );
let _ = tx_clone.send( Err( OpenAIError::Stream( error_msg ).into() ) ).await;
},
}
}
},
Err( e ) =>
{
let error_msg = format!( "SSE streaming error : {e}" );
let _ = tx_clone.send( Err( OpenAIError::Stream( error_msg ).into() ) ).await;
break;
},
}
}
});
Ok( rx )
}
#[ inline ]
pub(in crate) async fn post_multipart< O >( &self, path : &str, form : reqwest::multipart::Form ) -> Result< O >
where
O : DeserializeOwned,
{
let url = self.environment.join_base_url( path )?;
let http_client = &self.http_client;
let start_time = Instant::now();
if let Some( diagnostics ) = &self.diagnostics
{
let request_metrics = RequestMetrics
{
timestamp : start_time,
method : "POST".to_string(),
endpoint : path.to_string(),
headers : if diagnostics.config.collection.request_headers
{
vec![ ( "Content-Type".to_string(), "multipart/form-data".to_string() ) ]
}
else
{
vec![]
},
body_size : 0, user_agent : "api_openai/0.2.0".to_string(),
};
diagnostics.record_request( &request_metrics );
}
let response = http_client.request( Method::POST, url ).multipart( form ).send().await;
let response = response.map_err( | e | OpenAIError::Network( e.to_string() ) )?;
if response.status().is_success()
{
let result : O = response.json().await.map_err( | e | OpenAIError::Internal( e.to_string() ) )?;
Ok( result )
}
else
{
let status = response.status();
let error_text = response.text().await.unwrap_or_else( | _ | "Unknown error".to_string() );
Err( OpenAIError::Api( crate::error::ApiError {
code : Some( status.as_u16().to_string() ),
message : error_text,
param : None,
r#type : Some( "http_error".to_string() ),
} ).into() )
}
}
#[ inline ]
pub(in crate) async fn post_binary< I >( &self, path : &str, body : &I ) -> Result< Vec< u8 > >
where
I: serde::Serialize + Sync,
{
let url = self.environment.join_base_url( path )?;
let http_client = &self.http_client;
let start_time = Instant::now();
if let Some( diagnostics ) = &self.diagnostics
{
let request_body_size = serde_json::to_vec( body ).map( |v| v.len() ).unwrap_or( 0 );
let request_metrics = RequestMetrics
{
timestamp : start_time,
method : "POST".to_string(),
endpoint : path.to_string(),
headers : if diagnostics.config.collection.request_headers
{
vec![ ( "Content-Type".to_string(), "application/json".to_string() ) ]
}
else
{
vec![]
},
body_size : request_body_size,
user_agent : "api_openai/0.2.0".to_string(),
};
diagnostics.record_request( &request_metrics );
}
let response = self.execute_request_with_retry( || {
http_client.request( Method::POST, url.clone() ).json( body ).send()
}).await;
match response
{
Ok( response ) =>
{
let status_code = response.status().as_u16();
let response_time = start_time.elapsed();
if response.status().is_success()
{
let bytes = response.bytes().await
.map_err( | e | OpenAIError::Internal( format!( "Failed to read response bytes : {e}" ) ) )?;
if let Some( diagnostics ) = &self.diagnostics
{
let response_metrics = ResponseMetrics
{
timestamp : start_time,
status_code,
response_time,
body_size : bytes.len(),
headers : if diagnostics.config.collection.response_headers
{
vec![ ( "Content-Type".to_string(), "application/octet-stream".to_string() ) ]
}
else
{
vec![]
},
tokens_used : None,
};
diagnostics.record_response( &response_metrics );
}
Ok( bytes.to_vec() )
}
else
{
let error_text = response.text().await.unwrap_or_else( | _ | "Unknown error".to_string() );
Err( OpenAIError::Api( crate::error::ApiError {
code : Some( status_code.to_string() ),
message : error_text,
param : None,
r#type : Some( "http_error".to_string() ),
} ).into() )
}
}
Err( e ) => Err( e )
}
}
#[ inline ]
pub(in crate) async fn get_bytes( &self, path : &str ) -> Result< Vec< u8 > >
{
let url = self.environment.join_base_url( path )?;
let http_client = &self.http_client;
let start_time = Instant::now();
if let Some( diagnostics ) = &self.diagnostics
{
let request_metrics = RequestMetrics
{
timestamp : start_time,
method : "GET".to_string(),
endpoint : path.to_string(),
headers : if diagnostics.config.collection.request_headers
{
vec![ ( "Accept".to_string(), "application/octet-stream".to_string() ) ]
}
else
{
vec![]
},
body_size : 0,
user_agent : "api_openai/0.2.0".to_string(),
};
diagnostics.record_request( &request_metrics );
}
let response = self.execute_request_with_retry( || {
http_client.request( Method::GET, url.clone() ).send()
}).await;
match response
{
Ok( response ) =>
{
let status_code = response.status().as_u16();
let response_time = start_time.elapsed();
if response.status().is_success()
{
let bytes = response.bytes().await
.map_err( | e | OpenAIError::Internal( format!( "Failed to read response bytes : {e}" ) ) )?;
if let Some( diagnostics ) = &self.diagnostics
{
let response_metrics = ResponseMetrics
{
timestamp : start_time,
status_code,
response_time,
body_size : bytes.len(),
headers : if diagnostics.config.collection.response_headers
{
vec![ ( "Content-Type".to_string(), "application/octet-stream".to_string() ) ]
}
else
{
vec![]
},
tokens_used : None,
};
diagnostics.record_response( &response_metrics );
}
Ok( bytes.to_vec() )
}
else
{
let error_text = response.text().await.unwrap_or_else( | _ | "Unknown error".to_string() );
Err( OpenAIError::Api( crate::error::ApiError {
code : Some( status_code.to_string() ),
message : error_text,
param : None,
r#type : Some( "http_error".to_string() ),
} ).into() )
}
}
Err( e ) => Err( e )
}
}
}
}