ds_api/agent/stream.rs
1//! Agent streaming state machine.
2//!
3//! This module is responsible *only* for scheduling and polling — it does not
4//! contain any business logic. All "do actual work" functions live in
5//! `executor`:
6//!
7//! ```text
8//! AgentStream::poll_next
9//! │
10//! ├─ Idle → spawn run_summarize future
11//! ├─ Summarizing → poll future → ConnectingStream | FetchingResponse
12//! ├─ FetchingResponse → poll future → YieldingToolCalls | Done (yield Token)
13//! ├─ ConnectingStream → poll future → StreamingChunks
14//! ├─ StreamingChunks → poll inner stream → yield Token | YieldingToolCalls | Done
15//! ├─ YieldingToolCalls → drain queue → ExecutingTools (yield ToolCall per item)
16//! ├─ ExecutingTools → poll future → YieldingToolResults
17//! ├─ YieldingToolResults → drain queue → Idle (yield ToolResult per item)
18//! └─ Done → Poll::Ready(None)
19//! ```
20
21use std::collections::VecDeque;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use futures::{Stream, StreamExt};
26
27use super::executor::{
28 ChunkEvent, ConnectFuture, ExecFuture, FetchFuture, StreamingData, SummarizeFuture,
29 apply_chunk_delta, connect_stream, execute_tools, fetch_response, finalize_stream,
30 run_summarize,
31};
32use crate::agent::agent_core::{AgentEvent, DeepseekAgent, ToolCallChunk, ToolCallResult};
33use crate::error::ApiError;
34
35// ── State machine ─────────────────────────────────────────────────────────────
36
37/// Drives an agent through one or more API turns, tool-execution rounds, and
38/// summarization passes, emitting [`AgentEvent`]s as a [`Stream`].
39///
40/// Obtain one by calling [`DeepseekAgent::chat`][crate::agent::DeepseekAgent::chat].
41/// Collect it with any `futures::StreamExt` combinator or `while let Some(…)`.
42///
43/// # Example
44///
45/// ```no_run
46/// use futures::StreamExt;
47/// use ds_api::{DeepseekAgent, AgentEvent};
48///
49/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
50/// let mut stream = DeepseekAgent::new("sk-...")
51/// .with_streaming()
52/// .chat("What is 2 + 2?");
53///
54/// while let Some(event) = stream.next().await {
55/// match event? {
56/// AgentEvent::Token(text) => print!("{text}"),
57/// AgentEvent::ToolCall(c) => print!("{}", c.delta),
58/// AgentEvent::ToolResult(res) => println!("[result: {}]", res.result),
59/// AgentEvent::ReasoningToken(text) => print!("{text}"),
60/// }
61/// }
62/// # Ok(())
63/// # }
64/// ```
65pub struct AgentStream {
66 /// The agent is held here whenever no future has taken ownership of it.
67 agent: Option<DeepseekAgent>,
68 state: AgentStreamState,
69 /// Small queue for cases where one logical response produces multiple events
70 /// (e.g. non-streaming deepseek-reasoner: ReasoningToken then Token).
71 pending_events: VecDeque<AgentEvent>,
72}
73
74/// Every variant is self-contained: it either holds the agent directly or stores
75/// a future that will return the agent when it resolves.
76pub(crate) enum AgentStreamState {
77 /// Waiting to start (or restart after tool results are delivered).
78 Idle,
79 /// Running `maybe_summarize` before the next API turn.
80 Summarizing(SummarizeFuture),
81 /// Awaiting a non-streaming API response.
82 FetchingResponse(FetchFuture),
83 /// Awaiting the initial SSE connection.
84 ConnectingStream(ConnectFuture),
85 /// Polling an active SSE stream chunk-by-chunk.
86 StreamingChunks(Box<StreamingData>),
87 /// Yielding individual `ToolCall` events before execution starts.
88 /// `from_streaming`: if true, events were already emitted as chunks during
89 /// [`StreamingChunks`] — skip emitting and go straight to [`ExecutingTools`].
90 YieldingToolCalls {
91 pending: VecDeque<crate::raw::request::message::ToolCall>,
92 raw: Vec<crate::raw::request::message::ToolCall>,
93 from_streaming: bool,
94 },
95 /// Awaiting parallel/sequential tool execution.
96 ExecutingTools(ExecFuture),
97 /// Yielding individual `ToolResult` events after execution completes.
98 YieldingToolResults { pending: VecDeque<ToolCallResult> },
99 /// Terminal state — the stream will never produce another item.
100 Done,
101}
102
103// ── Constructor / accessor ────────────────────────────────────────────────────
104
105impl AgentStream {
106 /// Wrap an agent and start in the `Idle` state.
107 pub fn new(agent: DeepseekAgent) -> Self {
108 Self {
109 agent: Some(agent),
110 state: AgentStreamState::Idle,
111 pending_events: VecDeque::new(),
112 }
113 }
114
115 /// Consume the stream and return the agent.
116 ///
117 /// If the stream finished normally (or was dropped mid-stream), the agent is
118 /// returned so callers can continue the conversation without constructing a
119 /// new one.
120 ///
121 /// Returns `None` only if the agent is currently owned by an in-progress
122 /// future (i.e. the stream was dropped mid-poll, which is very unusual).
123 pub fn into_agent(self) -> Option<DeepseekAgent> {
124 match self.state {
125 AgentStreamState::StreamingChunks(data) => Some(data.agent),
126 _ => self.agent,
127 }
128 }
129}
130
131// ── Stream implementation ─────────────────────────────────────────────────────
132
133impl Stream for AgentStream {
134 type Item = Result<AgentEvent, ApiError>;
135
136 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137 let this = self.get_mut();
138
139 loop {
140 // Drain any events queued by a previous iteration before polling state.
141 if let Some(ev) = this.pending_events.pop_front() {
142 return Poll::Ready(Some(Ok(ev)));
143 }
144
145 // ── StreamingChunks is handled first to avoid borrow-checker
146 // conflicts: we need to both poll the inner stream *and* replace
147 // `this.state`, which requires owning the data.
148 if matches!(this.state, AgentStreamState::StreamingChunks(_)) {
149 let mut data = match std::mem::replace(&mut this.state, AgentStreamState::Done) {
150 AgentStreamState::StreamingChunks(d) => d,
151 _ => unreachable!(),
152 };
153
154 match data.stream.poll_next_unpin(cx) {
155 Poll::Pending => {
156 this.state = AgentStreamState::StreamingChunks(data);
157 return Poll::Pending;
158 }
159
160 Poll::Ready(Some(Ok(chunk))) => {
161 let mut events = apply_chunk_delta(&mut data, chunk);
162 this.state = AgentStreamState::StreamingChunks(data);
163 // Drain the first event (if any); remaining are lost only
164 // if multiple fire per chunk, which never happens in practice.
165 if !events.is_empty() {
166 let ev = events.swap_remove(0);
167 return Poll::Ready(Some(Ok(match ev {
168 ChunkEvent::Token(t) => AgentEvent::Token(t),
169 ChunkEvent::ReasoningToken(t) => AgentEvent::ReasoningToken(t),
170 ChunkEvent::ToolCallChunk { id, name, delta } => {
171 AgentEvent::ToolCall(ToolCallChunk { id, name, delta })
172 }
173 })));
174 }
175 continue;
176 }
177
178 Poll::Ready(Some(Err(e))) => {
179 // Stream errored — salvage the agent and terminate.
180 this.agent = Some(data.agent);
181 // state stays Done (set above via mem::replace)
182 return Poll::Ready(Some(Err(e)));
183 }
184
185 Poll::Ready(None) => {
186 // SSE stream ended — assemble full tool calls from buffers.
187 let raw_tool_calls = finalize_stream(&mut data);
188
189 if raw_tool_calls.is_empty() {
190 this.agent = Some(data.agent);
191 this.state = AgentStreamState::Done;
192 return Poll::Ready(None);
193 }
194
195 this.agent = Some(data.agent);
196 this.state = AgentStreamState::YieldingToolCalls {
197 pending: VecDeque::new(),
198 raw: raw_tool_calls,
199 from_streaming: true,
200 };
201 continue;
202 }
203 }
204 }
205
206 // ── All other states ──────────────────────────────────────────────
207 match &mut this.state {
208 AgentStreamState::Done => return Poll::Ready(None),
209
210 AgentStreamState::Idle => {
211 let agent = this.agent.as_mut().expect("agent missing in Idle state");
212 agent.drain_interrupts();
213 let agent = this.agent.take().unwrap();
214 this.state = AgentStreamState::Summarizing(Box::pin(run_summarize(agent)));
215 }
216
217 AgentStreamState::Summarizing(fut) => match fut.as_mut().poll(cx) {
218 Poll::Pending => return Poll::Pending,
219 Poll::Ready(agent) => {
220 this.state = if agent.streaming {
221 AgentStreamState::ConnectingStream(Box::pin(connect_stream(agent)))
222 } else {
223 AgentStreamState::FetchingResponse(Box::pin(fetch_response(agent)))
224 };
225 }
226 },
227
228 AgentStreamState::FetchingResponse(fut) => match fut.as_mut().poll(cx) {
229 Poll::Pending => return Poll::Pending,
230 Poll::Ready((Err(e), agent)) => {
231 this.agent = Some(agent);
232 this.state = AgentStreamState::Done;
233 return Poll::Ready(Some(Err(e)));
234 }
235 Poll::Ready((Ok(fetch), agent)) => {
236 this.agent = Some(agent);
237
238 if fetch.raw_tool_calls.is_empty() {
239 this.state = AgentStreamState::Done;
240 // Queue both events (reasoning then content) so neither is lost.
241 if let Some(reasoning) = fetch.reasoning_content {
242 this.pending_events
243 .push_back(AgentEvent::ReasoningToken(reasoning));
244 }
245 if let Some(text) = fetch.content {
246 this.pending_events.push_back(AgentEvent::Token(text));
247 }
248 // The pending_events drain at the top of the loop will emit them.
249 continue;
250 }
251
252 // Yield any text content before transitioning.
253 let maybe_text = fetch.content.map(AgentEvent::Token);
254 let pending = fetch
255 .raw_tool_calls
256 .iter()
257 .cloned()
258 .collect::<VecDeque<_>>();
259 this.state = AgentStreamState::YieldingToolCalls {
260 pending,
261 raw: fetch.raw_tool_calls,
262 from_streaming: false,
263 };
264
265 if let Some(event) = maybe_text {
266 return Poll::Ready(Some(Ok(event)));
267 }
268 continue;
269 }
270 },
271
272 AgentStreamState::ConnectingStream(fut) => match fut.as_mut().poll(cx) {
273 Poll::Pending => return Poll::Pending,
274 Poll::Ready((Err(e), agent)) => {
275 this.agent = Some(agent);
276 this.state = AgentStreamState::Done;
277 return Poll::Ready(Some(Err(e)));
278 }
279 Poll::Ready((Ok(stream), agent)) => {
280 this.state = AgentStreamState::StreamingChunks(Box::new(StreamingData {
281 stream,
282 agent,
283 content_buf: String::new(),
284 reasoning_buf: String::new(),
285 tool_call_bufs: Vec::new(),
286 }));
287 // Loop back to hit the StreamingChunks branch.
288 }
289 },
290
291 AgentStreamState::YieldingToolCalls { pending, raw, from_streaming } => {
292 if !*from_streaming {
293 if let Some(tc) = pending.pop_front() {
294 return Poll::Ready(Some(Ok(AgentEvent::ToolCall(ToolCallChunk {
295 id: tc.id.clone(),
296 name: tc.function.name.clone(),
297 delta: tc.function.arguments.clone(),
298 }))));
299 }
300 }
301 // All events yielded (or streaming — already emitted as chunks).
302 let agent = this
303 .agent
304 .take()
305 .expect("agent missing in YieldingToolCalls");
306 let raw_calls = std::mem::take(raw);
307 this.state =
308 AgentStreamState::ExecutingTools(Box::pin(execute_tools(agent, raw_calls)));
309 }
310
311 AgentStreamState::ExecutingTools(fut) => match fut.as_mut().poll(cx) {
312 Poll::Pending => return Poll::Pending,
313 Poll::Ready((tools_result, agent)) => {
314 this.agent = Some(agent);
315 this.state = AgentStreamState::YieldingToolResults {
316 pending: tools_result.results.into_iter().collect(),
317 };
318 }
319 },
320
321 AgentStreamState::YieldingToolResults { pending } => {
322 if let Some(result) = pending.pop_front() {
323 return Poll::Ready(Some(Ok(AgentEvent::ToolResult(result))));
324 }
325 // All results delivered — loop back for the next API turn.
326 this.state = AgentStreamState::Idle;
327 }
328
329 // Handled in the dedicated block above; this arm is unreachable
330 // but the compiler cannot verify that without exhaustiveness help.
331 AgentStreamState::StreamingChunks(_) => unreachable!(),
332 }
333 }
334 }
335}