ds_api/agent/stream.rs
1//! Agent streaming state machine.
2//!
3//! This module is responsible *only* for scheduling and polling — it does not
4//! contain any business logic. All "do actual work" functions live in
5//! `executor`:
6//!
7//! ```text
8//! AgentStream::poll_next
9//! │
10//! ├─ Idle → spawn run_summarize future
11//! ├─ Summarizing → poll future → ConnectingStream | FetchingResponse
12//! ├─ FetchingResponse → poll future → YieldingToolCalls | Done (yield Token)
13//! ├─ ConnectingStream → poll future → StreamingChunks
14//! ├─ StreamingChunks → poll inner stream → yield Token | YieldingToolCalls | Done
15//! ├─ YieldingToolCalls → drain queue → ExecutingTools (yield ToolCall per item)
16//! ├─ ExecutingTools → poll future → YieldingToolResults
17//! ├─ YieldingToolResults → drain queue → Idle (yield ToolResult per item)
18//! └─ Done → Poll::Ready(None)
19//! ```
20
21use std::collections::VecDeque;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use futures::{Stream, StreamExt};
26
27use super::executor::{
28 ChunkEvent, ConnectFuture, ExecFuture, FetchFuture, StreamingData, SummarizeFuture,
29 apply_chunk_delta, connect_stream, execute_tools, fetch_response, finalize_stream,
30 run_summarize,
31};
32use crate::agent::agent_core::{AgentEvent, DeepseekAgent, ToolCallChunk, ToolCallResult};
33use crate::error::ApiError;
34
35// ── State machine ─────────────────────────────────────────────────────────────
36
37/// Drives an agent through one or more API turns, tool-execution rounds, and
38/// summarization passes, emitting [`AgentEvent`]s as a [`Stream`].
39///
40/// Obtain one by calling [`DeepseekAgent::chat`][crate::agent::DeepseekAgent::chat].
41/// Collect it with any `futures::StreamExt` combinator or `while let Some(…)`.
42///
43/// # Example
44///
45/// ```no_run
46/// use futures::StreamExt;
47/// use ds_api::{DeepseekAgent, AgentEvent};
48///
49/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
50/// let mut stream = DeepseekAgent::new("sk-...")
51/// .with_streaming()
52/// .chat("What is 2 + 2?");
53///
54/// while let Some(event) = stream.next().await {
55/// match event? {
56/// AgentEvent::Token(text) => print!("{text}"),
57/// AgentEvent::ToolCall(c) => print!("{}", c.delta),
58/// AgentEvent::ToolResult(res) => println!("[result: {}]", res.result),
59/// AgentEvent::ReasoningToken(text) => print!("{text}"),
60/// }
61/// }
62/// # Ok(())
63/// # }
64/// ```
65pub struct AgentStream {
66 /// The agent is held here whenever no future has taken ownership of it.
67 agent: Option<DeepseekAgent>,
68 state: AgentStreamState,
69 /// Small queue for cases where one logical response produces multiple events
70 /// (e.g. non-streaming deepseek-reasoner: ReasoningToken then Token).
71 pending_events: VecDeque<AgentEvent>,
72}
73
74/// Every variant is self-contained: it either holds the agent directly or stores
75/// a future that will return the agent when it resolves.
76pub(crate) enum AgentStreamState {
77 /// Waiting to start (or restart after tool results are delivered).
78 Idle,
79 /// Running `maybe_summarize` before the next API turn.
80 Summarizing(SummarizeFuture),
81 /// Awaiting a non-streaming API response.
82 FetchingResponse(FetchFuture),
83 /// Awaiting the initial SSE connection.
84 ConnectingStream(ConnectFuture),
85 /// Polling an active SSE stream chunk-by-chunk.
86 StreamingChunks(Box<StreamingData>),
87 /// Yielding individual `ToolCall` events before execution starts.
88 /// `from_streaming`: if true, events were already emitted as chunks during
89 /// [`StreamingChunks`] — skip emitting and go straight to [`ExecutingTools`].
90 YieldingToolCalls {
91 pending: VecDeque<crate::raw::request::message::ToolCall>,
92 raw: Vec<crate::raw::request::message::ToolCall>,
93 from_streaming: bool,
94 },
95 /// Awaiting parallel/sequential tool execution.
96 ExecutingTools(ExecFuture),
97 /// Yielding individual `ToolResult` events after execution completes.
98 YieldingToolResults { pending: VecDeque<ToolCallResult> },
99 /// Terminal state — the stream will never produce another item.
100 Done,
101}
102
103// ── Constructor / accessor ────────────────────────────────────────────────────
104
105impl AgentStream {
106 /// Wrap an agent and start in the `Idle` state.
107 pub fn new(agent: DeepseekAgent) -> Self {
108 Self {
109 agent: Some(agent),
110 state: AgentStreamState::Idle,
111 pending_events: VecDeque::new(),
112 }
113 }
114
115 /// Consume the stream and return the agent.
116 ///
117 /// If the stream finished normally (or was dropped mid-stream), the agent is
118 /// returned so callers can continue the conversation without constructing a
119 /// new one.
120 ///
121 /// Returns `None` only if the agent is currently owned by an in-progress
122 /// future (i.e. the stream was dropped mid-poll, which is very unusual).
123 pub fn into_agent(self) -> Option<DeepseekAgent> {
124 match self.state {
125 AgentStreamState::StreamingChunks(data) => Some(data.agent),
126 _ => self.agent,
127 }
128 }
129}
130
131// ── Stream implementation ─────────────────────────────────────────────────────
132
133impl Stream for AgentStream {
134 type Item = Result<AgentEvent, ApiError>;
135
136 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137 let this = self.get_mut();
138
139 loop {
140 // Drain any events queued by a previous iteration before polling state.
141 if let Some(ev) = this.pending_events.pop_front() {
142 return Poll::Ready(Some(Ok(ev)));
143 }
144
145 // ── StreamingChunks is handled first to avoid borrow-checker
146 // conflicts: we need to both poll the inner stream *and* replace
147 // `this.state`, which requires owning the data.
148 if matches!(this.state, AgentStreamState::StreamingChunks(_)) {
149 let mut data = match std::mem::replace(&mut this.state, AgentStreamState::Done) {
150 AgentStreamState::StreamingChunks(d) => d,
151 _ => unreachable!(),
152 };
153
154 match data.stream.poll_next_unpin(cx) {
155 Poll::Pending => {
156 this.state = AgentStreamState::StreamingChunks(data);
157 return Poll::Pending;
158 }
159
160 Poll::Ready(Some(Ok(chunk))) => {
161 let mut events = apply_chunk_delta(&mut data, chunk);
162 this.state = AgentStreamState::StreamingChunks(data);
163 // Drain the first event (if any); remaining are lost only
164 // if multiple fire per chunk, which never happens in practice.
165 if !events.is_empty() {
166 let ev = events.swap_remove(0);
167 return Poll::Ready(Some(Ok(match ev {
168 ChunkEvent::Token(t) => AgentEvent::Token(t),
169 ChunkEvent::ReasoningToken(t) => AgentEvent::ReasoningToken(t),
170 ChunkEvent::ToolCallChunk {
171 id,
172 name,
173 delta,
174 index,
175 } => AgentEvent::ToolCall(ToolCallChunk {
176 id,
177 name,
178 delta,
179 index,
180 }),
181 })));
182 }
183 continue;
184 }
185
186 Poll::Ready(Some(Err(e))) => {
187 // Stream errored — salvage the agent and terminate.
188 this.agent = Some(data.agent);
189 // state stays Done (set above via mem::replace)
190 return Poll::Ready(Some(Err(e)));
191 }
192
193 Poll::Ready(None) => {
194 // SSE stream ended — assemble full tool calls from buffers.
195 let raw_tool_calls = finalize_stream(&mut data);
196
197 if raw_tool_calls.is_empty() {
198 this.agent = Some(data.agent);
199 this.state = AgentStreamState::Done;
200 return Poll::Ready(None);
201 }
202
203 this.agent = Some(data.agent);
204 this.state = AgentStreamState::YieldingToolCalls {
205 pending: VecDeque::new(),
206 raw: raw_tool_calls,
207 from_streaming: true,
208 };
209 continue;
210 }
211 }
212 }
213
214 // ── All other states ──────────────────────────────────────────────
215 match &mut this.state {
216 AgentStreamState::Done => return Poll::Ready(None),
217
218 AgentStreamState::Idle => {
219 let agent = this.agent.as_mut().expect("agent missing in Idle state");
220 agent.drain_interrupts();
221 let agent = this.agent.take().unwrap();
222 this.state = AgentStreamState::Summarizing(Box::pin(run_summarize(agent)));
223 }
224
225 AgentStreamState::Summarizing(fut) => match fut.as_mut().poll(cx) {
226 Poll::Pending => return Poll::Pending,
227 Poll::Ready(agent) => {
228 this.state = if agent.streaming {
229 AgentStreamState::ConnectingStream(Box::pin(connect_stream(agent)))
230 } else {
231 AgentStreamState::FetchingResponse(Box::pin(fetch_response(agent)))
232 };
233 }
234 },
235
236 AgentStreamState::FetchingResponse(fut) => match fut.as_mut().poll(cx) {
237 Poll::Pending => return Poll::Pending,
238 Poll::Ready((Err(e), agent)) => {
239 this.agent = Some(agent);
240 this.state = AgentStreamState::Done;
241 return Poll::Ready(Some(Err(e)));
242 }
243 Poll::Ready((Ok(fetch), agent)) => {
244 this.agent = Some(agent);
245
246 if fetch.raw_tool_calls.is_empty() {
247 this.state = AgentStreamState::Done;
248 // Queue both events (reasoning then content) so neither is lost.
249 if let Some(reasoning) = fetch.reasoning_content {
250 this.pending_events
251 .push_back(AgentEvent::ReasoningToken(reasoning));
252 }
253 if let Some(text) = fetch.content {
254 this.pending_events.push_back(AgentEvent::Token(text));
255 }
256 // The pending_events drain at the top of the loop will emit them.
257 continue;
258 }
259
260 // Yield any text content before transitioning.
261 let maybe_text = fetch.content.map(AgentEvent::Token);
262 let pending = fetch
263 .raw_tool_calls
264 .iter()
265 .cloned()
266 .collect::<VecDeque<_>>();
267 this.state = AgentStreamState::YieldingToolCalls {
268 pending,
269 raw: fetch.raw_tool_calls,
270 from_streaming: false,
271 };
272
273 if let Some(event) = maybe_text {
274 return Poll::Ready(Some(Ok(event)));
275 }
276 continue;
277 }
278 },
279
280 AgentStreamState::ConnectingStream(fut) => match fut.as_mut().poll(cx) {
281 Poll::Pending => return Poll::Pending,
282 Poll::Ready((Err(e), agent)) => {
283 this.agent = Some(agent);
284 this.state = AgentStreamState::Done;
285 return Poll::Ready(Some(Err(e)));
286 }
287 Poll::Ready((Ok(stream), agent)) => {
288 this.state = AgentStreamState::StreamingChunks(Box::new(StreamingData {
289 stream,
290 agent,
291 content_buf: String::new(),
292 reasoning_buf: String::new(),
293 tool_call_bufs: Vec::new(),
294 }));
295 // Loop back to hit the StreamingChunks branch.
296 }
297 },
298
299 AgentStreamState::YieldingToolCalls {
300 pending,
301 raw,
302 from_streaming,
303 } => {
304 if !*from_streaming && let Some(tc) = pending.pop_front() {
305 return Poll::Ready(Some(Ok(AgentEvent::ToolCall(ToolCallChunk {
306 id: tc.id.clone(),
307 name: tc.function.name.clone(),
308 delta: tc.function.arguments.clone(),
309 index: 0,
310 }))));
311 }
312 // All events yielded (or streaming — already emitted as chunks).
313 let agent = this
314 .agent
315 .take()
316 .expect("agent missing in YieldingToolCalls");
317 let raw_calls = std::mem::take(raw);
318 this.state =
319 AgentStreamState::ExecutingTools(Box::pin(execute_tools(agent, raw_calls)));
320 }
321
322 AgentStreamState::ExecutingTools(fut) => match fut.as_mut().poll(cx) {
323 Poll::Pending => return Poll::Pending,
324 Poll::Ready((tools_result, agent)) => {
325 this.agent = Some(agent);
326 this.state = AgentStreamState::YieldingToolResults {
327 pending: tools_result.results.into_iter().collect(),
328 };
329 }
330 },
331
332 AgentStreamState::YieldingToolResults { pending } => {
333 if let Some(result) = pending.pop_front() {
334 return Poll::Ready(Some(Ok(AgentEvent::ToolResult(result))));
335 }
336 // All results delivered — loop back for the next API turn.
337 this.state = AgentStreamState::Idle;
338 }
339
340 // Handled in the dedicated block above; this arm is unreachable
341 // but the compiler cannot verify that without exhaustiveness help.
342 AgentStreamState::StreamingChunks(_) => unreachable!(),
343 }
344 }
345 }
346}