use futures::stream::BoxStream;
use serde_json::Value;
use crate::agent::agent_core::{DeepseekAgent, ToolCallResult};
use crate::api::ApiRequest;
use crate::error::ApiError;
use crate::raw::ChatCompletionChunk;
use crate::raw::request::message::{FunctionCall, Message, Role, ToolCall, ToolType};
pub(crate) struct FetchResult {
pub(crate) content: Option<String>,
pub(crate) reasoning_content: Option<String>,
pub(crate) raw_tool_calls: Vec<ToolCall>,
}
pub(crate) struct ToolsResult {
pub(crate) results: Vec<ToolCallResult>,
}
pub(crate) struct PartialToolCall {
pub(crate) id: String,
pub(crate) name: String,
pub(crate) arguments: String,
}
pub(crate) struct StreamingData {
pub(crate) stream: BoxStream<'static, Result<ChatCompletionChunk, ApiError>>,
pub(crate) agent: DeepseekAgent,
pub(crate) content_buf: String,
pub(crate) reasoning_buf: String,
pub(crate) tool_call_bufs: Vec<Option<PartialToolCall>>,
}
pub(crate) enum ChunkEvent {
Token(String),
ReasoningToken(String),
ToolCallChunk {
id: String,
name: String,
delta: String,
index: u32,
},
}
pub(crate) type FetchFuture = std::pin::Pin<
Box<dyn std::future::Future<Output = (Result<FetchResult, ApiError>, DeepseekAgent)> + Send>,
>;
pub(crate) type ConnectFuture = std::pin::Pin<
Box<
dyn std::future::Future<
Output = (
Result<BoxStream<'static, Result<ChatCompletionChunk, ApiError>>, ApiError>,
DeepseekAgent,
),
> + Send,
>,
>;
pub(crate) type ExecFuture =
std::pin::Pin<Box<dyn std::future::Future<Output = (ToolsResult, DeepseekAgent)> + Send>>;
pub(crate) type SummarizeFuture =
std::pin::Pin<Box<dyn std::future::Future<Output = DeepseekAgent> + Send>>;
pub(crate) fn build_request(agent: &DeepseekAgent) -> ApiRequest {
let history = agent.conversation.history();
let messages: Vec<Message> = history
.iter()
.map(|m| {
if !matches!(m.role, Role::Assistant) {
return m.clone();
}
let has_tool_calls = m.tool_calls.as_ref().map(|v| !v.is_empty()).unwrap_or(false);
if has_tool_calls {
Message {
reasoning_content: Some(
m.reasoning_content.clone().unwrap_or_default()
),
..m.clone()
}
} else {
Message {
reasoning_content: None,
..m.clone()
}
}
})
.collect();
let mut req = ApiRequest::builder()
.with_model(agent.model.clone())
.messages(messages);
for tool in &agent.tools {
for raw in tool.raw_tools() {
req = req.add_tool(raw);
}
}
if !agent.tools.is_empty() {
req = req.tool_choice_auto();
}
if let Some(ref map) = agent.extra_body {
req = req.extra_body(map.clone());
}
req
}
pub(crate) async fn run_summarize(mut agent: DeepseekAgent) -> DeepseekAgent {
agent.conversation.maybe_summarize().await;
agent
}
pub(crate) async fn fetch_response(
mut agent: DeepseekAgent,
) -> (Result<FetchResult, ApiError>, DeepseekAgent) {
let req = build_request(&agent);
let resp = match agent.conversation.client.send(req).await {
Ok(r) => r,
Err(e) => return (Err(e), agent),
};
let choice = match resp.choices.into_iter().next() {
Some(c) => c,
None => {
return (
Err(ApiError::Other("empty response: no choices".into())),
agent,
);
}
};
let assistant_msg = choice.message;
let content = assistant_msg.content.clone();
let reasoning_content = assistant_msg.reasoning_content.clone();
let raw_tool_calls = assistant_msg.tool_calls.clone().unwrap_or_default();
agent.conversation.history_mut().push(assistant_msg);
(
Ok(FetchResult {
content,
reasoning_content,
raw_tool_calls,
}),
agent,
)
}
pub(crate) async fn connect_stream(
agent: DeepseekAgent,
) -> (
Result<BoxStream<'static, Result<ChatCompletionChunk, ApiError>>, ApiError>,
DeepseekAgent,
) {
let req = build_request(&agent);
match agent.conversation.client.clone().into_stream(req).await {
Ok(stream) => (Ok(stream), agent),
Err(e) => (Err(e), agent),
}
}
pub(crate) async fn execute_tools(
mut agent: DeepseekAgent,
raw_tool_calls: Vec<ToolCall>,
) -> (ToolsResult, DeepseekAgent) {
let mut results = Vec::with_capacity(raw_tool_calls.len());
let mut buffered_interrupts: Vec<String> = Vec::new();
for tc in raw_tool_calls {
let args: Value = serde_json::from_str(&tc.function.arguments).unwrap_or(Value::Null);
let mut result = serde_json::json!({ "error": "unknown tool" });
let mut aborted = false;
if let Some(&idx) = agent.tool_index.get(&tc.function.name) {
tokio::select! {
res = agent.tools[idx].call(&tc.function.name, args.clone()) => {
result = res;
}
maybe_msg = agent.interrupt_rx.recv() => {
if let Some(msg) = maybe_msg {
buffered_interrupts.push(msg);
while let Ok(more) = agent.interrupt_rx.try_recv() {
buffered_interrupts.push(more);
}
}
result = serde_json::json!({ "error": "aborted by interrupt" });
aborted = true;
}
}
} else {
result = serde_json::json!({ "error": format!("unknown tool: {}", tc.function.name) });
}
agent.conversation.history_mut().push(Message {
role: Role::Tool,
content: Some(result.to_string()),
tool_call_id: Some(tc.id.clone()),
..Default::default()
});
results.push(ToolCallResult {
id: tc.id,
name: tc.function.name,
args: tc.function.arguments,
result,
});
if aborted {
break;
}
}
while let Ok(msg) = agent.interrupt_rx.try_recv() {
buffered_interrupts.push(msg);
}
for msg in buffered_interrupts {
agent.conversation.history_mut().push(Message::user(&msg));
}
(ToolsResult { results }, agent)
}
pub(crate) fn finalize_stream(data: &mut StreamingData) -> Vec<ToolCall> {
let raw_tool_calls: Vec<ToolCall> = data
.tool_call_bufs
.drain(..)
.flatten()
.map(|p| ToolCall {
id: p.id,
r#type: ToolType::Function,
function: FunctionCall {
name: p.name,
arguments: p.arguments,
},
})
.collect();
let assistant_msg = Message {
role: Role::Assistant,
content: if data.content_buf.is_empty() {
None
} else {
Some(std::mem::take(&mut data.content_buf))
},
reasoning_content: if data.reasoning_buf.is_empty() {
None
} else {
Some(std::mem::take(&mut data.reasoning_buf))
},
tool_calls: if raw_tool_calls.is_empty() {
None
} else {
Some(raw_tool_calls.clone())
},
..Default::default()
};
data.agent.conversation.history_mut().push(assistant_msg);
raw_tool_calls
}
pub(crate) fn apply_chunk_delta(
data: &mut StreamingData,
chunk: crate::raw::ChatCompletionChunk,
) -> Vec<ChunkEvent> {
let choice = match chunk.choices.into_iter().next() {
Some(c) => c,
None => return vec![],
};
let delta = choice.delta;
if let Some(dtcs) = delta.tool_calls {
let mut events = Vec::new();
for dtc in dtcs {
let idx = dtc.index as usize;
if data.tool_call_bufs.len() <= idx {
data.tool_call_bufs.resize_with(idx + 1, || None);
}
let entry = &mut data.tool_call_bufs[idx];
if entry.is_none() {
let id = dtc.id.clone().unwrap_or_default();
let name = dtc
.function
.as_ref()
.and_then(|f| f.name.clone())
.unwrap_or_default();
events.push(ChunkEvent::ToolCallChunk {
id: id.clone(),
name: name.clone(),
delta: String::new(),
index: idx as u32,
});
*entry = Some(PartialToolCall {
id,
name,
arguments: String::new(),
});
}
if let Some(partial) = entry.as_mut() {
if let Some(id) = dtc.id
&& partial.id.is_empty()
{
partial.id = id;
}
if let Some(func) = dtc.function
&& let Some(args) = func.arguments
&& !args.is_empty()
{
partial.arguments.push_str(&args);
events.push(ChunkEvent::ToolCallChunk {
id: partial.id.clone(),
name: partial.name.clone(),
delta: args,
index: idx as u32,
});
}
}
}
return events;
}
if let Some(reasoning) = delta.reasoning_content
&& !reasoning.is_empty()
{
data.reasoning_buf.push_str(&reasoning);
return vec![ChunkEvent::ReasoningToken(reasoning)];
}
if let Some(content) = delta.content
&& !content.is_empty()
{
data.content_buf.push_str(&content);
return vec![ChunkEvent::Token(content)];
}
vec![]
}