#[ cfg( feature = "streaming" ) ]
mod private
{
use super::super::types::orphan::*;
#[ cfg( feature = "error-handling" ) ]
use crate::error::{ AnthropicError, AnthropicResult };
#[ cfg( not( feature = "error-handling" ) ) ]
type AnthropicError = crate::error_tools::Error;
#[ cfg( not( feature = "error-handling" ) ) ]
type AnthropicResult< T > = Result< T, crate::error_tools::Error >;
use crate::client::CreateMessageRequest;
#[ cfg( feature = "streaming" ) ]
impl crate::client::Client
{
pub async fn create_message_stream( &self, request : CreateMessageRequest ) -> AnthropicResult< EventStream >
{
use tokio_stream::wrappers::UnboundedReceiverStream;
request.validate()?;
let url = format!( "{}/v1/messages", self.base_url() );
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"Content-Type",
"application/json".parse().expect( "Valid content type" )
);
headers.insert(
"x-api-key",
self.secret().ANTHROPIC_API_KEY.parse().expect( "Valid API key" )
);
headers.insert(
"anthropic-version",
self.config().api_version.parse().expect( "Valid API version" )
);
headers.insert(
"Accept",
"text/event-stream".parse().expect( "Valid accept header" )
);
headers.insert(
"Cache-Control",
"no-cache".parse().expect( "Valid cache control" )
);
let http_client = reqwest::Client::builder()
.timeout( self.config().request_timeout )
.build()
.map_err( | e | AnthropicError::http_error( format!( "Failed to build HTTP client : {e}" ) ) )?;
let response = http_client
.post( &url )
.headers( headers )
.json( &request )
.send()
.await
.map_err( AnthropicError::from )?;
if !response.status().is_success()
{
let status = response.status();
let error_text = response.text().await.unwrap_or_else( |_| "Unknown error".to_string() );
if let Ok( api_error ) = serde_json::from_str::< crate::error::ApiErrorWrap >( &error_text )
{
return Err( AnthropicError::Api( api_error.error ) );
}
return Err( AnthropicError::http_error_with_status( format!( "HTTP {status}: {error_text}" ), status.as_u16() ) );
}
let ( tx, rx ) = tokio::sync::mpsc::unbounded_channel();
let _handle = tokio::spawn( async move
{
let text = match response.text().await
{
Ok( text ) => text,
Err( e ) =>
{
let error = AnthropicError::http_error( format!( "Failed to read response : {e}" ) );
let _ = tx.send( Err( error ) );
return;
}
};
match parse_sse_events( &text )
{
Ok( events ) =>
{
for event in events
{
if tx.send( Ok( event ) ).is_err()
{
return;
}
}
},
Err( e ) =>
{
let _ = tx.send( Err( e ) );
}
}
} );
let stream = UnboundedReceiverStream::new( rx );
Ok( Box::pin( stream ) )
}
}
#[ allow( dead_code ) ] fn extract_complete_event( buffer : &str ) -> Option< ( String, String ) >
{
if let Some( pos ) = buffer.find( "\n\n" )
{
let event = buffer[ ..pos ].to_string();
let remaining = buffer[ pos + 2.. ].to_string();
return Some( ( event, remaining ) );
}
if buffer.ends_with( '\n' ) && buffer.lines().any( | line | line.starts_with( "data:" ) )
{
let lines : Vec< &str > = buffer.lines().collect();
if lines.len() >= 2
{
let mut event_lines = Vec::new();
let mut i = 0;
while i < lines.len()
{
let line = lines[ i ];
if line.starts_with( "event:" ) || line.starts_with( "data:" )
{
event_lines.push( line );
i += 1;
while i < lines.len() && lines[ i ].starts_with( "data:" )
{
event_lines.push( lines[ i ] );
i += 1;
}
if event_lines.iter().any( | l | l.starts_with( "data:" ) )
{
let event = event_lines.join( "\n" );
let remaining_lines = &lines[ i.. ];
let remaining = remaining_lines.join( "\n" );
return Some( ( event, remaining ) );
}
}
else
{
i += 1;
}
}
}
}
None
}
}
#[ cfg( feature = "streaming" ) ]
crate::mod_interface!
{
}