use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::{Stream, StreamExt};
use super::executor::{
ChunkEvent, ConnectFuture, ExecFuture, FetchFuture, StreamingData, SummarizeFuture,
apply_chunk_delta, connect_stream, execute_tools, fetch_response, finalize_stream,
run_summarize,
};
use crate::agent::agent_core::{AgentEvent, DeepseekAgent, ToolCallChunk, ToolCallResult};
use crate::error::ApiError;
pub struct AgentStream {
agent: Option<DeepseekAgent>,
state: AgentStreamState,
pending_events: VecDeque<AgentEvent>,
}
pub(crate) enum AgentStreamState {
Idle,
Summarizing(SummarizeFuture),
FetchingResponse(FetchFuture),
ConnectingStream(ConnectFuture),
StreamingChunks(Box<StreamingData>),
YieldingToolCalls {
pending: VecDeque<crate::raw::request::message::ToolCall>,
raw: Vec<crate::raw::request::message::ToolCall>,
from_streaming: bool,
},
ExecutingTools(ExecFuture),
YieldingToolResults { pending: VecDeque<ToolCallResult> },
Done,
}
impl AgentStream {
pub fn new(agent: DeepseekAgent) -> Self {
Self {
agent: Some(agent),
state: AgentStreamState::Idle,
pending_events: VecDeque::new(),
}
}
pub fn into_agent(self) -> Option<DeepseekAgent> {
match self.state {
AgentStreamState::StreamingChunks(data) => Some(data.agent),
_ => self.agent,
}
}
}
impl Stream for AgentStream {
type Item = Result<AgentEvent, ApiError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if let Some(ev) = this.pending_events.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
if matches!(this.state, AgentStreamState::StreamingChunks(_)) {
let mut data = match std::mem::replace(&mut this.state, AgentStreamState::Done) {
AgentStreamState::StreamingChunks(d) => d,
_ => unreachable!(),
};
match data.stream.poll_next_unpin(cx) {
Poll::Pending => {
this.state = AgentStreamState::StreamingChunks(data);
return Poll::Pending;
}
Poll::Ready(Some(Ok(chunk))) => {
let mut events = apply_chunk_delta(&mut data, chunk);
this.state = AgentStreamState::StreamingChunks(data);
if !events.is_empty() {
for extra in events.drain(1..) {
this.pending_events.push_back(match extra {
ChunkEvent::Token(t) => AgentEvent::Token(t),
ChunkEvent::ReasoningToken(t) => AgentEvent::ReasoningToken(t),
ChunkEvent::ToolCallChunk { id, name, delta, index } =>
AgentEvent::ToolCall(ToolCallChunk { id, name, delta, index }),
});
}
let ev = events.swap_remove(0);
return Poll::Ready(Some(Ok(match ev {
ChunkEvent::Token(t) => AgentEvent::Token(t),
ChunkEvent::ReasoningToken(t) => AgentEvent::ReasoningToken(t),
ChunkEvent::ToolCallChunk {
id,
name,
delta,
index,
} => AgentEvent::ToolCall(ToolCallChunk {
id,
name,
delta,
index,
}),
})));
}
continue;
}
Poll::Ready(Some(Err(e))) => {
this.agent = Some(data.agent);
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
let raw_tool_calls = finalize_stream(&mut data);
if raw_tool_calls.is_empty() {
this.agent = Some(data.agent);
this.state = AgentStreamState::Done;
return Poll::Ready(None);
}
this.agent = Some(data.agent);
this.state = AgentStreamState::YieldingToolCalls {
pending: VecDeque::new(),
raw: raw_tool_calls,
from_streaming: true,
};
continue;
}
}
}
match &mut this.state {
AgentStreamState::Done => return Poll::Ready(None),
AgentStreamState::Idle => {
let agent = this.agent.as_mut().expect("agent missing in Idle state");
agent.drain_interrupts();
agent.drain_tool_injections();
let agent = this.agent.take().unwrap();
this.state = AgentStreamState::Summarizing(Box::pin(run_summarize(agent)));
}
AgentStreamState::Summarizing(fut) => match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(agent) => {
this.state = if agent.streaming {
AgentStreamState::ConnectingStream(Box::pin(connect_stream(agent)))
} else {
AgentStreamState::FetchingResponse(Box::pin(fetch_response(agent)))
};
}
},
AgentStreamState::FetchingResponse(fut) => match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready((Err(e), agent)) => {
this.agent = Some(agent);
this.state = AgentStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready((Ok(fetch), agent)) => {
this.agent = Some(agent);
if fetch.raw_tool_calls.is_empty() {
this.state = AgentStreamState::Done;
if let Some(reasoning) = fetch.reasoning_content {
this.pending_events
.push_back(AgentEvent::ReasoningToken(reasoning));
}
if let Some(text) = fetch.content {
this.pending_events.push_back(AgentEvent::Token(text));
}
continue;
}
let maybe_text = fetch.content.map(AgentEvent::Token);
let pending = fetch
.raw_tool_calls
.iter()
.cloned()
.collect::<VecDeque<_>>();
this.state = AgentStreamState::YieldingToolCalls {
pending,
raw: fetch.raw_tool_calls,
from_streaming: false,
};
if let Some(event) = maybe_text {
return Poll::Ready(Some(Ok(event)));
}
continue;
}
},
AgentStreamState::ConnectingStream(fut) => match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready((Err(e), agent)) => {
this.agent = Some(agent);
this.state = AgentStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready((Ok(stream), agent)) => {
this.state = AgentStreamState::StreamingChunks(Box::new(StreamingData {
stream,
agent,
content_buf: String::new(),
reasoning_buf: String::new(),
tool_call_bufs: Vec::new(),
}));
}
},
AgentStreamState::YieldingToolCalls {
pending,
raw,
from_streaming,
} => {
if !*from_streaming && let Some(tc) = pending.pop_front() {
return Poll::Ready(Some(Ok(AgentEvent::ToolCall(ToolCallChunk {
id: tc.id.clone(),
name: tc.function.name.clone(),
delta: tc.function.arguments.clone(),
index: 0,
}))));
}
let agent = this
.agent
.take()
.expect("agent missing in YieldingToolCalls");
let raw_calls = std::mem::take(raw);
this.state =
AgentStreamState::ExecutingTools(Box::pin(execute_tools(agent, raw_calls)));
}
AgentStreamState::ExecutingTools(fut) => match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready((tools_result, agent)) => {
this.agent = Some(agent);
this.state = AgentStreamState::YieldingToolResults {
pending: tools_result.results.into_iter().collect(),
};
}
},
AgentStreamState::YieldingToolResults { pending } => {
if let Some(result) = pending.pop_front() {
return Poll::Ready(Some(Ok(AgentEvent::ToolResult(result))));
}
this.state = AgentStreamState::Idle;
}
AgentStreamState::StreamingChunks(_) => unreachable!(),
}
}
}
}