ds_api/agent/stream.rs
1//! Agent streaming state machine.
2//!
3//! This module is responsible *only* for scheduling and polling — it does not
4//! contain any business logic. All "do actual work" functions live in
5//! `executor`:
6//!
7//! ```text
8//! AgentStream::poll_next
9//! │
10//! ├─ Idle → spawn run_summarize future
11//! ├─ Summarizing → poll future → ConnectingStream | FetchingResponse
12//! ├─ FetchingResponse → poll future → YieldingToolCalls | Done (yield Token)
13//! ├─ ConnectingStream → poll future → StreamingChunks
14//! ├─ StreamingChunks → poll inner stream → yield Token | YieldingToolCalls | Done
15//! ├─ YieldingToolCalls → drain queue → ExecutingTools (yield ToolCall per item)
16//! ├─ ExecutingTools → poll future → YieldingToolResults
17//! ├─ YieldingToolResults → drain queue → Idle (yield ToolResult per item)
18//! └─ Done → Poll::Ready(None)
19//! ```
20
21use std::collections::VecDeque;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use futures::{Stream, StreamExt};
26
27use super::executor::{
28 ChunkEvent, ConnectFuture, ExecFuture, FetchFuture, StreamingData, SummarizeFuture,
29 apply_chunk_delta, connect_stream, execute_tools, fetch_response, finalize_stream,
30 run_summarize,
31};
32use crate::agent::agent_core::{AgentEvent, DeepseekAgent, ToolCallChunk, ToolCallResult};
33use crate::error::ApiError;
34
35// ── State machine ─────────────────────────────────────────────────────────────
36
37/// Drives an agent through one or more API turns, tool-execution rounds, and
38/// summarization passes, emitting [`AgentEvent`]s as a [`Stream`].
39///
40/// Obtain one by calling [`DeepseekAgent::chat`][crate::agent::DeepseekAgent::chat].
41/// Collect it with any `futures::StreamExt` combinator or `while let Some(…)`.
42///
43/// # Example
44///
45/// ```no_run
46/// use futures::StreamExt;
47/// use ds_api::{DeepseekAgent, AgentEvent};
48///
49/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
50/// let mut stream = DeepseekAgent::new("sk-...")
51/// .with_streaming()
52/// .chat("What is 2 + 2?");
53///
54/// while let Some(event) = stream.next().await {
55/// match event? {
56/// AgentEvent::Token(text) => print!("{text}"),
57/// AgentEvent::ToolCall(c) => print!("{}", c.delta),
58/// AgentEvent::ToolResult(res) => println!("[result: {}]", res.result),
59/// AgentEvent::ReasoningToken(text) => print!("{text}"),
60/// }
61/// }
62/// # Ok(())
63/// # }
64/// ```
65pub struct AgentStream {
66 /// The agent is held here whenever no future has taken ownership of it.
67 agent: Option<DeepseekAgent>,
68 state: AgentStreamState,
69 /// Small queue for cases where one logical response produces multiple events
70 /// (e.g. non-streaming deepseek-reasoner: ReasoningToken then Token).
71 pending_events: VecDeque<AgentEvent>,
72}
73
74/// Every variant is self-contained: it either holds the agent directly or stores
75/// a future that will return the agent when it resolves.
76pub(crate) enum AgentStreamState {
77 /// Waiting to start (or restart after tool results are delivered).
78 Idle,
79 /// Running `maybe_summarize` before the next API turn.
80 Summarizing(SummarizeFuture),
81 /// Awaiting a non-streaming API response.
82 FetchingResponse(FetchFuture),
83 /// Awaiting the initial SSE connection.
84 ConnectingStream(ConnectFuture),
85 /// Polling an active SSE stream chunk-by-chunk.
86 StreamingChunks(Box<StreamingData>),
87 /// Yielding individual `ToolCall` events before execution starts.
88 /// `from_streaming`: if true, events were already emitted as chunks during
89 /// [`StreamingChunks`] — skip emitting and go straight to [`ExecutingTools`].
90 YieldingToolCalls {
91 pending: VecDeque<crate::raw::request::message::ToolCall>,
92 raw: Vec<crate::raw::request::message::ToolCall>,
93 from_streaming: bool,
94 },
95 /// Awaiting parallel/sequential tool execution.
96 ExecutingTools(ExecFuture),
97 /// Yielding individual `ToolResult` events after execution completes.
98 YieldingToolResults { pending: VecDeque<ToolCallResult> },
99 /// Terminal state — the stream will never produce another item.
100 Done,
101}
102
103// ── Constructor / accessor ────────────────────────────────────────────────────
104
105impl AgentStream {
106 /// Wrap an agent and start in the `Idle` state.
107 pub fn new(agent: DeepseekAgent) -> Self {
108 Self {
109 agent: Some(agent),
110 state: AgentStreamState::Idle,
111 pending_events: VecDeque::new(),
112 }
113 }
114
115 /// Consume the stream and return the agent.
116 ///
117 /// If the stream finished normally (or was dropped mid-stream), the agent is
118 /// returned so callers can continue the conversation without constructing a
119 /// new one.
120 ///
121 /// Returns `None` only if the agent is currently owned by an in-progress
122 /// future (i.e. the stream was dropped mid-poll, which is very unusual).
123 pub fn into_agent(self) -> Option<DeepseekAgent> {
124 match self.state {
125 AgentStreamState::StreamingChunks(data) => Some(data.agent),
126 _ => self.agent,
127 }
128 }
129}
130
131// ── Stream implementation ─────────────────────────────────────────────────────
132
133impl Stream for AgentStream {
134 type Item = Result<AgentEvent, ApiError>;
135
136 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137 let this = self.get_mut();
138
139 loop {
140 // Drain any events queued by a previous iteration before polling state.
141 if let Some(ev) = this.pending_events.pop_front() {
142 return Poll::Ready(Some(Ok(ev)));
143 }
144
145 // ── StreamingChunks is handled first to avoid borrow-checker
146 // conflicts: we need to both poll the inner stream *and* replace
147 // `this.state`, which requires owning the data.
148 if matches!(this.state, AgentStreamState::StreamingChunks(_)) {
149 let mut data = match std::mem::replace(&mut this.state, AgentStreamState::Done) {
150 AgentStreamState::StreamingChunks(d) => d,
151 _ => unreachable!(),
152 };
153
154 match data.stream.poll_next_unpin(cx) {
155 Poll::Pending => {
156 this.state = AgentStreamState::StreamingChunks(data);
157 return Poll::Pending;
158 }
159
160 Poll::Ready(Some(Ok(chunk))) => {
161 let mut events = apply_chunk_delta(&mut data, chunk);
162 this.state = AgentStreamState::StreamingChunks(data);
163 // Queue all events; drain them one per poll via pending_events.
164 if !events.is_empty() {
165 // Push tail events into the pending queue (they will be
166 // returned on subsequent poll_next calls before we poll
167 // the underlying stream again).
168 for extra in events.drain(1..) {
169 this.pending_events.push_back(match extra {
170 ChunkEvent::Token(t) => AgentEvent::Token(t),
171 ChunkEvent::ReasoningToken(t) => AgentEvent::ReasoningToken(t),
172 ChunkEvent::ToolCallChunk { id, name, delta, index } =>
173 AgentEvent::ToolCall(ToolCallChunk { id, name, delta, index }),
174 });
175 }
176 let ev = events.swap_remove(0);
177 return Poll::Ready(Some(Ok(match ev {
178 ChunkEvent::Token(t) => AgentEvent::Token(t),
179 ChunkEvent::ReasoningToken(t) => AgentEvent::ReasoningToken(t),
180 ChunkEvent::ToolCallChunk {
181 id,
182 name,
183 delta,
184 index,
185 } => AgentEvent::ToolCall(ToolCallChunk {
186 id,
187 name,
188 delta,
189 index,
190 }),
191 })));
192 }
193 continue;
194 }
195
196 Poll::Ready(Some(Err(e))) => {
197 // Stream errored — salvage the agent and terminate.
198 this.agent = Some(data.agent);
199 // state stays Done (set above via mem::replace)
200 return Poll::Ready(Some(Err(e)));
201 }
202
203 Poll::Ready(None) => {
204 // SSE stream ended — assemble full tool calls from buffers.
205 let raw_tool_calls = finalize_stream(&mut data);
206
207 if raw_tool_calls.is_empty() {
208 this.agent = Some(data.agent);
209 this.state = AgentStreamState::Done;
210 return Poll::Ready(None);
211 }
212
213 this.agent = Some(data.agent);
214 this.state = AgentStreamState::YieldingToolCalls {
215 pending: VecDeque::new(),
216 raw: raw_tool_calls,
217 from_streaming: true,
218 };
219 continue;
220 }
221 }
222 }
223
224 // ── All other states ──────────────────────────────────────────────
225 match &mut this.state {
226 AgentStreamState::Done => return Poll::Ready(None),
227
228 AgentStreamState::Idle => {
229 let agent = this.agent.as_mut().expect("agent missing in Idle state");
230 agent.drain_interrupts();
231 agent.drain_tool_injections();
232 let agent = this.agent.take().unwrap();
233 this.state = AgentStreamState::Summarizing(Box::pin(run_summarize(agent)));
234 }
235
236 AgentStreamState::Summarizing(fut) => match fut.as_mut().poll(cx) {
237 Poll::Pending => return Poll::Pending,
238 Poll::Ready(agent) => {
239 this.state = if agent.streaming {
240 AgentStreamState::ConnectingStream(Box::pin(connect_stream(agent)))
241 } else {
242 AgentStreamState::FetchingResponse(Box::pin(fetch_response(agent)))
243 };
244 }
245 },
246
247 AgentStreamState::FetchingResponse(fut) => match fut.as_mut().poll(cx) {
248 Poll::Pending => return Poll::Pending,
249 Poll::Ready((Err(e), agent)) => {
250 this.agent = Some(agent);
251 this.state = AgentStreamState::Done;
252 return Poll::Ready(Some(Err(e)));
253 }
254 Poll::Ready((Ok(fetch), agent)) => {
255 this.agent = Some(agent);
256
257 if fetch.raw_tool_calls.is_empty() {
258 this.state = AgentStreamState::Done;
259 // Queue both events (reasoning then content) so neither is lost.
260 if let Some(reasoning) = fetch.reasoning_content {
261 this.pending_events
262 .push_back(AgentEvent::ReasoningToken(reasoning));
263 }
264 if let Some(text) = fetch.content {
265 this.pending_events.push_back(AgentEvent::Token(text));
266 }
267 // The pending_events drain at the top of the loop will emit them.
268 continue;
269 }
270
271 // Yield any text content before transitioning.
272 let maybe_text = fetch.content.map(AgentEvent::Token);
273 let pending = fetch
274 .raw_tool_calls
275 .iter()
276 .cloned()
277 .collect::<VecDeque<_>>();
278 this.state = AgentStreamState::YieldingToolCalls {
279 pending,
280 raw: fetch.raw_tool_calls,
281 from_streaming: false,
282 };
283
284 if let Some(event) = maybe_text {
285 return Poll::Ready(Some(Ok(event)));
286 }
287 continue;
288 }
289 },
290
291 AgentStreamState::ConnectingStream(fut) => match fut.as_mut().poll(cx) {
292 Poll::Pending => return Poll::Pending,
293 Poll::Ready((Err(e), agent)) => {
294 this.agent = Some(agent);
295 this.state = AgentStreamState::Done;
296 return Poll::Ready(Some(Err(e)));
297 }
298 Poll::Ready((Ok(stream), agent)) => {
299 this.state = AgentStreamState::StreamingChunks(Box::new(StreamingData {
300 stream,
301 agent,
302 content_buf: String::new(),
303 reasoning_buf: String::new(),
304 tool_call_bufs: Vec::new(),
305 }));
306 // Loop back to hit the StreamingChunks branch.
307 }
308 },
309
310 AgentStreamState::YieldingToolCalls {
311 pending,
312 raw,
313 from_streaming,
314 } => {
315 if !*from_streaming && let Some(tc) = pending.pop_front() {
316 return Poll::Ready(Some(Ok(AgentEvent::ToolCall(ToolCallChunk {
317 id: tc.id.clone(),
318 name: tc.function.name.clone(),
319 delta: tc.function.arguments.clone(),
320 index: 0,
321 }))));
322 }
323 // All events yielded (or streaming — already emitted as chunks).
324 let agent = this
325 .agent
326 .take()
327 .expect("agent missing in YieldingToolCalls");
328 let raw_calls = std::mem::take(raw);
329 this.state =
330 AgentStreamState::ExecutingTools(Box::pin(execute_tools(agent, raw_calls)));
331 }
332
333 AgentStreamState::ExecutingTools(fut) => match fut.as_mut().poll(cx) {
334 Poll::Pending => return Poll::Pending,
335 Poll::Ready((tools_result, agent)) => {
336 this.agent = Some(agent);
337 this.state = AgentStreamState::YieldingToolResults {
338 pending: tools_result.results.into_iter().collect(),
339 };
340 }
341 },
342
343 AgentStreamState::YieldingToolResults { pending } => {
344 if let Some(result) = pending.pop_front() {
345 return Poll::Ready(Some(Ok(AgentEvent::ToolResult(result))));
346 }
347 // All results delivered — loop back for the next API turn.
348 this.state = AgentStreamState::Idle;
349 }
350
351 // Handled in the dedicated block above; this arm is unreachable
352 // but the compiler cannot verify that without exhaustiveness help.
353 AgentStreamState::StreamingChunks(_) => unreachable!(),
354 }
355 }
356 }
357}