use std::sync::Arc;
use async_trait::async_trait;
use fusillade::http::{HttpClient as FusilladeHttpClient, HttpResponse};
use fusillade::{FusilladeError, PoolProvider as FusilladePool, RequestData, Result as FusilladeResult};
use onwards::client::HttpClient as OnwardsHttpClient;
use onwards::traits::{RequestContext, ToolExecutor};
use onwards::{LoopConfig, LoopError, MultiStepStore, UpstreamTarget};
use crate::responses::processor::DaemonToolResolver;
use crate::responses::store::{FusilladeResponseStore, PendingResponseInput};
use crate::tool_executor::ResolvedTools;
pub struct ResponseLoopHttpClient<P, T>
where
P: FusilladePool + Clone + Send + Sync + 'static,
T: ToolExecutor + 'static,
{
pub response_store: Arc<FusilladeResponseStore<P>>,
pub tool_executor: Arc<T>,
pub inner_http: Arc<dyn OnwardsHttpClient + Send + Sync>,
pub tool_resolver: Option<Arc<dyn DaemonToolResolver>>,
pub loop_config: LoopConfig,
}
impl<P, T> Clone for ResponseLoopHttpClient<P, T>
where
P: FusilladePool + Clone + Send + Sync + 'static,
T: ToolExecutor + 'static,
{
fn clone(&self) -> Self {
Self {
response_store: self.response_store.clone(),
tool_executor: self.tool_executor.clone(),
inner_http: self.inner_http.clone(),
tool_resolver: self.tool_resolver.clone(),
loop_config: self.loop_config,
}
}
}
#[async_trait]
impl<P, T> FusilladeHttpClient for ResponseLoopHttpClient<P, T>
where
P: FusilladePool + Clone + Send + Sync + 'static,
T: ToolExecutor + 'static,
{
async fn execute(&self, request: &RequestData, api_key: &str) -> FusilladeResult<HttpResponse> {
let request_id = request.id.0.to_string();
let mut tool_ctx = RequestContext::new().with_model(request.model.clone());
let mut resolved_tool_names = std::collections::HashSet::new();
if let Some(resolver) = &self.tool_resolver {
match resolver.resolve(api_key, &request.model).await {
Ok(Some(resolved)) => {
resolved_tool_names = resolved.tools.keys().cloned().collect();
tool_ctx = tool_ctx.with_extension(ResolvedTools(Arc::new(resolved)));
}
Ok(None) => {
tracing::debug!(
request_id = %request.id,
model = %request.model,
"no tools resolved for daemon-driven /v1/responses request"
);
}
Err(e) => {
tracing::warn!(
error = %e,
request_id = %request.id,
"tool resolution failed for daemon path; running loop with no tools"
);
}
}
}
let api_key_opt = (!api_key.is_empty()).then(|| api_key.to_string());
let created_by_opt = (!request.created_by.is_empty()).then(|| request.created_by.clone());
let pending = PendingResponseInput {
body: request.body.clone(),
api_key: api_key_opt.clone(),
created_by: created_by_opt,
base_url: request.endpoint.clone(),
resolved_tool_names,
};
if let Err(e) = self.response_store.register_pending_with_id(request.id.0, pending) {
return Err(FusilladeError::Other(anyhow::anyhow!(
"register pending input for daemon-driven /v1/responses: {e}"
)));
}
let cleanup_store = self.response_store.clone();
let cleanup_id = request_id.clone();
let _pending_guard = scopeguard::guard((), move |_| {
cleanup_store.unregister_pending(&cleanup_id);
});
let upstream = UpstreamTarget {
url: {
let base = request.endpoint.trim_end_matches('/');
format!("{base}/v1/chat/completions")
},
api_key: api_key_opt,
};
let result = onwards::run_response_loop(
&*self.response_store,
&*self.tool_executor,
&tool_ctx,
&upstream,
self.inner_http.clone(),
None,
&request_id,
None,
self.loop_config,
0,
)
.await;
match result {
Ok(_final_payload) => {
let assembled = self
.response_store
.assemble_response(&request_id)
.await
.map_err(|e| FusilladeError::Other(anyhow::anyhow!("assemble_response after loop: {e}")))?;
self.response_store
.finalize_head_request(&request_id, 200, assembled.clone())
.await
.map_err(|e| FusilladeError::Other(anyhow::anyhow!("finalize head sub-request: {e}")))?;
let body = serde_json::to_string(&assembled)
.map_err(|e| FusilladeError::Other(anyhow::anyhow!("serialize assembled response: {e}")))?;
Ok(HttpResponse { status: 200, body })
}
Err(LoopError::Failed(payload)) => {
if let Err(e) = self.response_store.finalize_head_request(&request_id, 500, payload.clone()).await {
tracing::warn!(error = %e, request_id = %request_id, "Failed to finalize head sub-request after loop failure");
}
let body = serde_json::to_string(&payload).unwrap_or_default();
Ok(HttpResponse { status: 500, body })
}
Err(other) => {
let payload = serde_json::json!({
"type": "loop_error",
"message": other.to_string(),
});
if let Err(e) = self.response_store.finalize_head_request(&request_id, 500, payload.clone()).await {
tracing::warn!(error = %e, request_id = %request_id, "Failed to finalize head sub-request after unexpected loop error");
}
let body = serde_json::to_string(&payload).unwrap_or_default();
Ok(HttpResponse { status: 500, body })
}
}
}
}