use std::sync::Arc;
use futures_util::StreamExt;
use serde_json::Value;
use crate::clients::base::{ChunkType, LLMClient, LLMRequestOptions, LLMResponse};
use crate::core::tool_spec::ToolSpec;
use crate::error::StreamError;
use super::response_shape::text_response_result;
use super::HandlerResult;
pub async fn run_passthrough<C: LLMClient + 'static>(
client: &Arc<C>,
serialized: &[Value],
_tools: Option<Vec<ToolSpec>>,
options: LLMRequestOptions,
model_name: &str,
stream: bool,
stream_include_usage: bool,
) -> Result<HandlerResult, String> {
if stream {
return run_passthrough_stream(
client,
serialized,
options,
model_name,
stream_include_usage,
)
.await;
}
let envelope = client
.send_envelope_with_options(serialized.to_vec(), None, options)
.await
.map_err(|e| e.to_string())?;
let usage = envelope.usage;
let usage_details = envelope.usage_details;
match envelope.response {
LLMResponse::Text(text) => Ok(text_response_result(
&text,
model_name,
stream,
stream_include_usage,
usage.as_ref(),
usage_details.as_ref(),
)),
LLMResponse::ToolCalls(_) => {
Err("backend returned tool calls for request without tools".to_string())
}
}
}
async fn run_passthrough_stream<C: LLMClient + 'static>(
client: &Arc<C>,
serialized: &[Value],
options: LLMRequestOptions,
model_name: &str,
include_usage: bool,
) -> Result<HandlerResult, String> {
let mut backend_stream = client
.send_stream_with_options(serialized.to_vec(), None, options)
.await
.map_err(|e| e.to_string())?;
let client = client.clone();
let model_name = model_name.to_string();
let stream = async_stream::stream! {
let completion_id = crate::proxy::proxy::openai_stream_completion_id();
let mut emitted_text = false;
while let Some(chunk_result) = backend_stream.next().await {
let chunk = match chunk_result {
Ok(chunk) => chunk,
Err(err) => {
yield Err(err);
return;
}
};
match chunk.chunk_type {
ChunkType::TextDelta => {
if !chunk.content.is_empty() {
yield Ok(crate::proxy::proxy::text_delta_sse_event(
&completion_id,
&model_name,
&chunk.content,
!emitted_text,
None,
));
emitted_text = true;
}
}
ChunkType::Final => {
let final_usage = chunk.usage;
let final_usage_details = chunk.usage_details;
if !emitted_text {
let content = match chunk.response {
Some(LLMResponse::Text(text)) => text.content,
Some(LLMResponse::ToolCalls(_)) => {
yield Err(StreamError::new(
"backend returned tool calls for request without tools",
));
return;
}
None => String::new(),
};
yield Ok(crate::proxy::proxy::text_delta_sse_event(
&completion_id,
&model_name,
&content,
true,
None,
));
}
let usage_json = if include_usage {
let usage = final_usage.or_else(|| client.last_usage());
let usage_details =
final_usage_details.or_else(|| client.last_usage_details());
usage.as_ref().map(|u| {
crate::proxy::proxy::usage_to_openai_json_with_details(
Some(u),
usage_details.as_ref(),
)
})
} else {
None
};
yield Ok(crate::proxy::proxy::final_sse_event(
&completion_id,
&model_name,
"stop",
usage_json.as_ref(),
));
return;
}
ChunkType::ToolCallDelta => {
yield Err(StreamError::new(
"backend streamed tool calls for request without tools",
));
return;
}
ChunkType::Retry => {}
}
}
yield Err(StreamError::default());
};
Ok(HandlerResult::StreamBody(Box::pin(stream)))
}