use crate::error::Result;
use crate::types::RequestData;
use async_trait::async_trait;
use opentelemetry::trace::TraceContextExt;
use std::time::Duration;
use tracing_opentelemetry::OpenTelemetrySpanExt;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HttpResponse {
pub status: u16,
pub body: String,
}
#[async_trait]
pub trait HttpClient: Send + Sync + Clone {
async fn execute(&self, request: &RequestData, api_key: &str) -> Result<HttpResponse>;
}
#[derive(Clone)]
pub struct ReqwestHttpClient {
client: reqwest::Client,
first_chunk_timeout: Duration,
chunk_timeout: Duration,
body_timeout: Duration,
}
impl ReqwestHttpClient {
pub fn new(
first_chunk_timeout: Duration,
chunk_timeout: Duration,
body_timeout: Duration,
) -> Self {
Self {
client: reqwest::Client::new(),
first_chunk_timeout,
chunk_timeout,
body_timeout,
}
}
}
impl Default for ReqwestHttpClient {
fn default() -> Self {
Self::new(ONE_DAY_DURATION, ONE_DAY_DURATION, ONE_DAY_DURATION)
}
}
const ONE_DAY_DURATION: Duration = Duration::from_secs(86_400);
#[async_trait]
impl HttpClient for ReqwestHttpClient {
#[tracing::instrument(name = "fusillade.execute", skip(self, request, api_key), fields(
otel.name = %format!("{} {}", request.method, request.path),
))]
async fn execute(&self, request: &RequestData, api_key: &str) -> Result<HttpResponse> {
let url = format!("{}{}", request.endpoint, request.path);
let span = tracing::Span::current();
span.set_attribute("otel.kind", "Client");
span.set_attribute("http.request.method", request.method.clone());
span.set_attribute("url.path", request.path.clone());
span.set_attribute("url.full", url.clone());
tracing::debug!(
url.full = %url,
first_chunk_timeout_ms = self.first_chunk_timeout.as_millis() as u64,
chunk_timeout_ms = self.chunk_timeout.as_millis() as u64,
body_timeout_ms = self.body_timeout.as_millis() as u64,
"Executing HTTP request"
);
let mut req = self.client.request(
request.method.parse().map_err(|e| {
tracing::error!(method = %request.method, error = %e, "Invalid HTTP method");
anyhow::anyhow!("Invalid HTTP method '{}': {}", request.method, e)
})?,
&url,
);
if !api_key.is_empty() {
req = req.header("Authorization", format!("Bearer {}", api_key));
tracing::trace!(request_id = %request.id, "Added Authorization header");
}
req = req.header("X-Fusillade-Request-Id", request.id.0.to_string());
for (key, value) in &request.batch_metadata {
let header_name = format!("x-fusillade-batch-{}", key.replace('_', "-"));
req = req.header(&header_name, value);
}
if let Some(custom_id) = &request.custom_id {
req = req.header("X-Fusillade-Custom-Id", custom_id.clone());
tracing::trace!(request_id = %request.id, custom_id = %custom_id, "Added X-Fusillade-Custom-Id header");
}
let ctx = tracing::Span::current().context();
let span_ref = ctx.span();
let span_ctx = span_ref.span_context();
if span_ctx.is_valid() {
let traceparent = format!(
"00-{}-{}-{:02x}",
span_ctx.trace_id(),
span_ctx.span_id(),
span_ctx.trace_flags().to_u8()
);
req = req.header("traceparent", &traceparent);
tracing::trace!(request_id = %request.id, traceparent = %traceparent, "Added traceparent header for distributed tracing");
}
let method_upper = request.method.to_uppercase();
if method_upper != "GET"
&& method_upper != "HEAD"
&& method_upper != "DELETE"
&& !request.body.is_empty()
{
req = req
.header("Content-Type", "application/json")
.body(request.body.clone());
tracing::trace!(
request_id = %request.id,
body_len = request.body.len(),
"Added request body"
);
}
if request.stream {
self.execute_streaming(request, req, &url).await
} else {
self.execute_non_streaming(request, req, &url).await
}
}
}
impl ReqwestHttpClient {
async fn execute_non_streaming(
&self,
request: &RequestData,
req: reqwest::RequestBuilder,
url: &str,
) -> Result<HttpResponse> {
let total_timeout = self.first_chunk_timeout + self.body_timeout;
let response = req.timeout(total_timeout).send().await.map_err(|e| {
if e.is_builder() {
tracing::error!(
request_id = %request.id,
url.full = %url,
error = %e.to_string(),
custom_id = ?request.custom_id,
batch_metadata_keys = ?request.batch_metadata.keys().collect::<Vec<_>>(),
"Failed to build HTTP request (not retriable) - likely invalid header value"
);
} else {
tracing::error!(
request_id = %request.id,
url.full = %url,
error = %e,
"HTTP request failed"
);
}
e
})?;
let status = response.status().as_u16();
let body = response.text().await?;
tracing::debug!(
request_id = %request.id,
status = status,
response_len = body.len(),
"HTTP request completed"
);
Ok(HttpResponse { status, body })
}
async fn execute_streaming(
&self,
request: &RequestData,
req: reqwest::RequestBuilder,
url: &str,
) -> Result<HttpResponse> {
let (mut response, status, first_chunk) = tokio::time::timeout(
self.first_chunk_timeout,
async {
let mut resp = req
.send()
.await
.map_err(|e| -> crate::error::FusilladeError { e.into() })
.inspect_err(|e| {
tracing::error!(
request_id = %request.id,
url.full = %url,
error = %e,
custom_id = ?request.custom_id,
batch_metadata_keys = ?request.batch_metadata.keys().collect::<Vec<_>>(),
"HTTP request failed"
);
})?;
let status = resp.status().as_u16();
let first_chunk = resp
.chunk()
.await
.map_err(|e| -> crate::error::FusilladeError { e.into() })?;
Ok::<_, crate::error::FusilladeError>((resp, status, first_chunk))
},
)
.await
.map_err(|_| {
crate::error::FusilladeError::FirstChunkTimeout(format!(
"No first token from {} within {}ms",
url,
self.first_chunk_timeout.as_millis()
))
})??;
let body_bytes = tokio::time::timeout(self.body_timeout, async {
let mut buf = Vec::from(first_chunk.unwrap_or_default());
loop {
match tokio::time::timeout(self.chunk_timeout, response.chunk()).await {
Ok(Ok(Some(chunk))) => buf.extend_from_slice(&chunk),
Ok(Ok(None)) => break,
Ok(Err(e)) => return Err(e.into()),
Err(_) => {
return Err(crate::error::FusilladeError::TokensTimeout(format!(
"Body read stalled from {} after {}ms ({} bytes received)",
url,
self.chunk_timeout.as_millis(),
buf.len()
)));
}
}
}
Ok(buf)
})
.await
.map_err(|_| {
crate::error::FusilladeError::BodyTimeout(format!(
"Total body read from {} exceeded {}ms",
url,
self.body_timeout.as_millis()
))
})??;
let body = String::from_utf8(body_bytes)
.map_err(|e| anyhow::anyhow!("Response body from {} is not valid UTF-8: {}", url, e))?;
tracing::debug!(
request_id = %request.id,
status = status,
response_len = body.len(),
"HTTP request completed"
);
Ok(HttpResponse { status, body })
}
}
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::oneshot;
#[derive(Clone)]
pub struct MockHttpClient {
responses: Arc<Mutex<HashMap<String, Vec<MockResponse>>>>,
calls: Arc<Mutex<Vec<MockCall>>>,
in_flight: Arc<AtomicUsize>,
}
enum MockResponse {
Immediate(Result<HttpResponse>),
Triggered {
response: Result<HttpResponse>,
trigger: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
},
}
#[derive(Debug, Clone)]
pub struct MockCall {
pub method: String,
pub endpoint: String,
pub path: String,
pub body: String,
pub api_key: String,
pub batch_metadata: std::collections::HashMap<String, String>,
}
impl MockHttpClient {
pub fn new() -> Self {
Self {
responses: Arc::new(Mutex::new(HashMap::new())),
calls: Arc::new(Mutex::new(Vec::new())),
in_flight: Arc::new(AtomicUsize::new(0)),
}
}
pub fn add_response(&self, key: &str, response: Result<HttpResponse>) {
self.responses
.lock()
.entry(key.to_string())
.or_default()
.push(MockResponse::Immediate(response));
}
pub fn add_response_with_trigger(
&self,
key: &str,
response: Result<HttpResponse>,
) -> oneshot::Sender<()> {
let (tx, rx) = oneshot::channel();
self.responses
.lock()
.entry(key.to_string())
.or_default()
.push(MockResponse::Triggered {
response,
trigger: Arc::new(Mutex::new(Some(rx))),
});
tx
}
pub fn get_calls(&self) -> Vec<MockCall> {
self.calls.lock().clone()
}
pub fn clear_calls(&self) {
self.calls.lock().clear();
}
pub fn call_count(&self) -> usize {
self.calls.lock().len()
}
pub fn in_flight_count(&self) -> usize {
self.in_flight.load(Ordering::SeqCst)
}
}
impl Default for MockHttpClient {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl HttpClient for MockHttpClient {
async fn execute(&self, request: &RequestData, api_key: &str) -> Result<HttpResponse> {
self.in_flight.fetch_add(1, Ordering::SeqCst);
let in_flight = self.in_flight.clone();
let _guard = InFlightGuard { in_flight };
self.calls.lock().push(MockCall {
method: request.method.clone(),
endpoint: request.endpoint.clone(),
path: request.path.clone(),
body: request.body.clone(),
api_key: api_key.to_string(),
batch_metadata: request.batch_metadata.clone(),
});
let key = format!("{} {}", request.method, request.path);
let mock_response = {
let mut responses = self.responses.lock();
if let Some(response_queue) = responses.get_mut(&key) {
if !response_queue.is_empty() {
Some(response_queue.remove(0))
} else {
None
}
} else {
None
}
};
match mock_response {
Some(MockResponse::Immediate(response)) => response,
Some(MockResponse::Triggered { response, trigger }) => {
let rx = {
let mut trigger_guard = trigger.lock();
trigger_guard.take()
};
if let Some(rx) = rx {
let _ = rx.await;
}
response
}
None => {
Err(crate::error::FusilladeError::Other(anyhow::anyhow!(
"No mock response configured for {} {}",
request.method,
request.path
)))
}
}
}
}
struct InFlightGuard {
in_flight: Arc<AtomicUsize>,
}
impl Drop for InFlightGuard {
fn drop(&mut self) {
self.in_flight.fetch_sub(1, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::RequestId;
#[tokio::test]
async fn test_mock_client_basic() {
let mock = MockHttpClient::new();
mock.add_response(
"POST /test",
Ok(HttpResponse {
status: 200,
body: "success".to_string(),
}),
);
let request = RequestData {
id: RequestId::from(uuid::Uuid::new_v4()),
batch_id: crate::batch::BatchId::from(uuid::Uuid::new_v4()),
template_id: crate::batch::TemplateId::from(uuid::Uuid::new_v4()),
custom_id: None,
endpoint: "https://api.example.com".to_string(),
method: "POST".to_string(),
path: "/test".to_string(),
body: "{}".to_string(),
model: "test-model".to_string(),
api_key: "test-key".to_string(),
stream: false,
batch_metadata: std::collections::HashMap::new(),
};
let response = mock.execute(&request, "test-key").await.unwrap();
assert_eq!(response.status, 200);
assert_eq!(response.body, "success");
let calls = mock.get_calls();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].method, "POST");
assert_eq!(calls[0].path, "/test");
assert_eq!(calls[0].api_key, "test-key");
}
#[tokio::test]
async fn test_mock_client_multiple_responses() {
let mock = MockHttpClient::new();
mock.add_response(
"GET /status",
Ok(HttpResponse {
status: 200,
body: "first".to_string(),
}),
);
mock.add_response(
"GET /status",
Ok(HttpResponse {
status: 200,
body: "second".to_string(),
}),
);
let request = RequestData {
id: RequestId::from(uuid::Uuid::new_v4()),
batch_id: crate::batch::BatchId::from(uuid::Uuid::new_v4()),
template_id: crate::batch::TemplateId::from(uuid::Uuid::new_v4()),
custom_id: None,
endpoint: "https://api.example.com".to_string(),
method: "GET".to_string(),
path: "/status".to_string(),
body: "".to_string(),
model: "test-model".to_string(),
api_key: "test-key".to_string(),
stream: false,
batch_metadata: std::collections::HashMap::new(),
};
let response1 = mock.execute(&request, "key").await.unwrap();
assert_eq!(response1.body, "first");
let response2 = mock.execute(&request, "key").await.unwrap();
assert_eq!(response2.body, "second");
assert_eq!(mock.call_count(), 2);
}
#[tokio::test]
async fn test_mock_client_no_response() {
let mock = MockHttpClient::new();
let request = RequestData {
id: RequestId::from(uuid::Uuid::new_v4()),
batch_id: crate::batch::BatchId::from(uuid::Uuid::new_v4()),
template_id: crate::batch::TemplateId::from(uuid::Uuid::new_v4()),
custom_id: None,
endpoint: "https://api.example.com".to_string(),
method: "POST".to_string(),
path: "/unknown".to_string(),
body: "{}".to_string(),
model: "test-model".to_string(),
api_key: "test-key".to_string(),
stream: false,
batch_metadata: std::collections::HashMap::new(),
};
let result = mock.execute(&request, "key").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_mock_client_with_trigger() {
let mock = MockHttpClient::new();
let trigger = mock.add_response_with_trigger(
"POST /test",
Ok(HttpResponse {
status: 200,
body: "triggered".to_string(),
}),
);
let request = RequestData {
id: RequestId::from(uuid::Uuid::new_v4()),
batch_id: crate::batch::BatchId::from(uuid::Uuid::new_v4()),
template_id: crate::batch::TemplateId::from(uuid::Uuid::new_v4()),
custom_id: None,
endpoint: "https://api.example.com".to_string(),
method: "POST".to_string(),
path: "/test".to_string(),
body: "{}".to_string(),
model: "test-model".to_string(),
api_key: "test-key".to_string(),
stream: false,
batch_metadata: std::collections::HashMap::new(),
};
let mock_clone = mock.clone();
let handle = tokio::spawn(async move { mock_clone.execute(&request, "key").await });
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert!(!handle.is_finished());
trigger.send(()).unwrap();
let response = handle.await.unwrap().unwrap();
assert_eq!(response.status, 200);
assert_eq!(response.body, "triggered");
}
#[tokio::test]
async fn test_mock_client_records_batch_metadata() {
let mock = MockHttpClient::new();
mock.add_response(
"POST /test",
Ok(HttpResponse {
status: 200,
body: "success".to_string(),
}),
);
let mut batch_metadata = std::collections::HashMap::new();
batch_metadata.insert("id".to_string(), "batch-123".to_string());
batch_metadata.insert(
"endpoint".to_string(),
"https://api.example.com".to_string(),
);
batch_metadata.insert("created_at".to_string(), "2025-12-19T12:00:00Z".to_string());
batch_metadata.insert("completion_window".to_string(), "2s".to_string());
let request = RequestData {
id: RequestId::from(uuid::Uuid::new_v4()),
batch_id: crate::batch::BatchId::from(uuid::Uuid::new_v4()),
template_id: crate::batch::TemplateId::from(uuid::Uuid::new_v4()),
custom_id: None,
endpoint: "https://api.example.com".to_string(),
method: "POST".to_string(),
path: "/test".to_string(),
body: r#"{"key":"value"}"#.to_string(),
model: "test-model".to_string(),
api_key: "test-key".to_string(),
stream: false,
batch_metadata: batch_metadata.clone(),
};
let response = mock.execute(&request, "test-key").await.unwrap();
assert_eq!(response.status, 200);
assert_eq!(response.body, "success");
let calls = mock.get_calls();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].batch_metadata.len(), 4);
assert_eq!(
calls[0].batch_metadata.get("id"),
Some(&"batch-123".to_string())
);
assert_eq!(
calls[0].batch_metadata.get("endpoint"),
Some(&"https://api.example.com".to_string())
);
assert_eq!(
calls[0].batch_metadata.get("created_at"),
Some(&"2025-12-19T12:00:00Z".to_string())
);
assert_eq!(
calls[0].batch_metadata.get("completion_window"),
Some(&"2s".to_string())
);
}
#[tokio::test]
async fn test_reqwest_client_sets_batch_metadata_headers() {
use axum::{Router, extract::Request, http::StatusCode, routing::post};
let app = Router::new().route(
"/test",
post(|request: Request| async move {
let headers = request.headers();
assert_eq!(
headers
.get("x-fusillade-batch-id")
.and_then(|h| h.to_str().ok()),
Some("batch-456"),
"Missing or incorrect x-fusillade-batch-id header"
);
assert_eq!(
headers
.get("x-fusillade-batch-endpoint")
.and_then(|h| h.to_str().ok()),
Some("/v1/completions"),
"Missing or incorrect x-fusillade-batch-endpoint header"
);
assert_eq!(
headers
.get("x-fusillade-batch-created-at")
.and_then(|h| h.to_str().ok()),
Some("2025-12-19T13:00:00Z"),
"Missing or incorrect x-fusillade-batch-created-at header"
);
assert_eq!(
headers
.get("x-fusillade-batch-completion-window")
.and_then(|h| h.to_str().ok()),
Some("24h"),
"Missing or incorrect x-fusillade-batch-completion-window header"
);
assert_eq!(
headers.get("authorization").and_then(|h| h.to_str().ok()),
Some("Bearer test-api-key"),
"Missing or incorrect authorization header"
);
assert!(
headers.get("x-fusillade-request-id").is_some(),
"Missing x-fusillade-request-id header"
);
(StatusCode::OK, r#"{"result":"ok"}"#)
}),
);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut batch_metadata = std::collections::HashMap::new();
batch_metadata.insert("id".to_string(), "batch-456".to_string());
batch_metadata.insert("endpoint".to_string(), "/v1/completions".to_string());
batch_metadata.insert("created_at".to_string(), "2025-12-19T13:00:00Z".to_string());
batch_metadata.insert("completion_window".to_string(), "24h".to_string());
let request = RequestData {
id: RequestId::from(uuid::Uuid::new_v4()),
batch_id: crate::batch::BatchId::from(uuid::Uuid::new_v4()),
template_id: crate::batch::TemplateId::from(uuid::Uuid::new_v4()),
custom_id: None,
endpoint: format!("http://{}", addr),
method: "POST".to_string(),
path: "/test".to_string(),
body: r#"{"prompt":"test"}"#.to_string(),
model: "test-model".to_string(),
api_key: "test-api-key".to_string(),
stream: false,
batch_metadata,
};
let client = ReqwestHttpClient::default();
let response = client.execute(&request, "test-api-key").await.unwrap();
assert_eq!(response.status, 200);
assert_eq!(response.body, r#"{"result":"ok"}"#);
}
#[tokio::test]
async fn test_read_timeout_on_stalled_body() {
use axum::{Router, http::StatusCode, routing::post};
let app = Router::new().route(
"/test",
post(|| async {
use futures::StreamExt;
let stream = futures::stream::once(async {
Ok::<_, std::convert::Infallible>("partial".to_string().into_bytes())
})
.chain(futures::stream::pending());
let body = axum::body::Body::from_stream(stream);
(StatusCode::OK, body)
}),
);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let request = RequestData {
id: RequestId::from(uuid::Uuid::new_v4()),
batch_id: crate::batch::BatchId::from(uuid::Uuid::new_v4()),
template_id: crate::batch::TemplateId::from(uuid::Uuid::new_v4()),
custom_id: None,
endpoint: format!("http://{}", addr),
method: "POST".to_string(),
path: "/test".to_string(),
body: "{}".to_string(),
model: "test-model".to_string(),
api_key: "".to_string(),
stream: true,
batch_metadata: std::collections::HashMap::new(),
};
let timeout = Duration::from_millis(200);
let client = ReqwestHttpClient::new(timeout, timeout, ONE_DAY_DURATION);
let result = client.execute(&request, "").await;
let err = result.expect_err("Expected TokensTimeout for stalled body");
match err {
crate::error::FusilladeError::TokensTimeout(msg) => {
assert!(msg.contains("Body read stalled"));
}
other => panic!("Expected TokensTimeout, got: {:?}", other),
}
}
#[tokio::test]
async fn test_read_timeout_on_stalled_headers() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
drop(socket);
}
});
let request = RequestData {
id: RequestId::from(uuid::Uuid::new_v4()),
batch_id: crate::batch::BatchId::from(uuid::Uuid::new_v4()),
template_id: crate::batch::TemplateId::from(uuid::Uuid::new_v4()),
custom_id: None,
endpoint: format!("http://{}", addr),
method: "POST".to_string(),
path: "/test".to_string(),
body: "{}".to_string(),
model: "test-model".to_string(),
api_key: "".to_string(),
stream: true,
batch_metadata: std::collections::HashMap::new(),
};
let timeout = Duration::from_millis(200);
let client = ReqwestHttpClient::new(timeout, timeout, ONE_DAY_DURATION);
let result = client.execute(&request, "").await;
let err = result.expect_err("Expected FirstChunkTimeout for stalled headers");
match err {
crate::error::FusilladeError::FirstChunkTimeout(msg) => {
assert!(msg.contains("No first token from"));
}
other => panic!("Expected FirstChunkTimeout, got: {:?}", other),
}
}
#[tokio::test]
async fn test_body_timeout_on_slow_drip() {
use axum::{Router, http::StatusCode, routing::post};
use futures::StreamExt;
let app = Router::new().route(
"/test",
post(|| async {
let stream = futures::stream::unfold(0u32, |i| async move {
if i >= 20 {
return None;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Some((
Ok::<_, std::convert::Infallible>(format!("chunk-{i}").into_bytes()),
i + 1,
))
})
.boxed();
let body = axum::body::Body::from_stream(stream);
(StatusCode::OK, body)
}),
);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let request = RequestData {
id: RequestId::from(uuid::Uuid::new_v4()),
batch_id: crate::batch::BatchId::from(uuid::Uuid::new_v4()),
template_id: crate::batch::TemplateId::from(uuid::Uuid::new_v4()),
custom_id: None,
endpoint: format!("http://{}", addr),
method: "POST".to_string(),
path: "/test".to_string(),
body: "{}".to_string(),
model: "test-model".to_string(),
api_key: "".to_string(),
stream: true,
batch_metadata: std::collections::HashMap::new(),
};
let client = ReqwestHttpClient::new(
ONE_DAY_DURATION,
Duration::from_millis(200),
Duration::from_millis(300),
);
let result = client.execute(&request, "").await;
let err = result.expect_err("Expected BodyTimeout for slow-drip response");
match err {
crate::error::FusilladeError::BodyTimeout(msg) => {
assert!(msg.contains("Total body read from"));
}
other => panic!("Expected BodyTimeout, got: {:?}", other),
}
}
#[tokio::test]
async fn test_custom_id_with_newline_is_not_retriable() {
use crate::request::types::FailureReason;
let request = RequestData {
id: RequestId::from(uuid::Uuid::new_v4()),
batch_id: crate::batch::BatchId::from(uuid::Uuid::new_v4()),
template_id: crate::batch::TemplateId::from(uuid::Uuid::new_v4()),
custom_id: Some("invalid\ncustom_id".to_string()), endpoint: "https://api.example.com".to_string(),
method: "POST".to_string(),
path: "/test".to_string(),
body: "{}".to_string(),
model: "test-model".to_string(),
api_key: "test-key".to_string(),
stream: false,
batch_metadata: std::collections::HashMap::new(),
};
let client = ReqwestHttpClient::default();
let result = client.execute(&request, "test-key").await;
let err = result.expect_err("Expected builder error for invalid header value");
let reason = match err {
crate::error::FusilladeError::HttpClient(ref reqwest_err)
if reqwest_err.is_builder() =>
{
FailureReason::RequestBuilderError {
error: reqwest_err.to_string(),
}
}
_ => panic!("Expected HttpClient builder error, got: {:?}", err),
};
assert!(
!reason.is_retriable(),
"Builder errors should not be retriable"
);
}
}