Skip to main content

ds_api/agent/
stream.rs

1//! Agent streaming state machine.
2//!
3//! This module is responsible *only* for scheduling and polling — it does not
4//! contain any business logic.  All "do actual work" functions live in
5//! `executor`:
6//!
7//! ```text
8//! AgentStream::poll_next
9//!   │
10//!   ├─ Idle              → spawn run_summarize future
11//!   ├─ Summarizing       → poll future → ConnectingStream | FetchingResponse
12//!   ├─ FetchingResponse  → poll future → YieldingToolCalls | Done  (yield Token)
13//!   ├─ ConnectingStream  → poll future → StreamingChunks
14//!   ├─ StreamingChunks   → poll inner stream → yield Token | YieldingToolCalls | Done
15//!   ├─ YieldingToolCalls → drain queue → ExecutingTools  (yield ToolCall per item)
16//!   ├─ ExecutingTools    → poll future → YieldingToolResults
17//!   ├─ YieldingToolResults → drain queue → Idle  (yield ToolResult per item)
18//!   └─ Done              → Poll::Ready(None)
19//! ```
20
21use std::collections::VecDeque;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use futures::{Stream, StreamExt};
26
27use super::executor::{
28    ChunkEvent, ConnectFuture, ExecFuture, FetchFuture, StreamingData, SummarizeFuture,
29    apply_chunk_delta, connect_stream, execute_tools, fetch_response, finalize_stream,
30    run_summarize,
31};
32use crate::agent::agent_core::{AgentEvent, DeepseekAgent, ToolCallChunk, ToolCallResult};
33use crate::error::ApiError;
34
35// ── State machine ─────────────────────────────────────────────────────────────
36
37/// Drives an agent through one or more API turns, tool-execution rounds, and
38/// summarization passes, emitting [`AgentEvent`]s as a [`Stream`].
39///
40/// Obtain one by calling [`DeepseekAgent::chat`][crate::agent::DeepseekAgent::chat].
41/// Collect it with any `futures::StreamExt` combinator or `while let Some(…)`.
42///
43/// # Example
44///
45/// ```no_run
46/// use futures::StreamExt;
47/// use ds_api::{DeepseekAgent, AgentEvent};
48///
49/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
50/// let mut stream = DeepseekAgent::new("sk-...")
51///     .with_streaming()
52///     .chat("What is 2 + 2?");
53///
54/// while let Some(event) = stream.next().await {
55///     match event? {
56///         AgentEvent::Token(text) => print!("{text}"),
57///         AgentEvent::ToolCall(c) => print!("{}", c.delta),
58///         AgentEvent::ToolResult(res) => println!("[result: {}]", res.result),
59///         AgentEvent::ReasoningToken(text) => print!("{text}"),
60///     }
61/// }
62/// # Ok(())
63/// # }
64/// ```
65pub struct AgentStream {
66    /// The agent is held here whenever no future has taken ownership of it.
67    agent: Option<DeepseekAgent>,
68    state: AgentStreamState,
69    /// Small queue for cases where one logical response produces multiple events
70    /// (e.g. non-streaming deepseek-reasoner: ReasoningToken then Token).
71    pending_events: VecDeque<AgentEvent>,
72}
73
74/// Every variant is self-contained: it either holds the agent directly or stores
75/// a future that will return the agent when it resolves.
76pub(crate) enum AgentStreamState {
77    /// Waiting to start (or restart after tool results are delivered).
78    Idle,
79    /// Running `maybe_summarize` before the next API turn.
80    Summarizing(SummarizeFuture),
81    /// Awaiting a non-streaming API response.
82    FetchingResponse(FetchFuture),
83    /// Awaiting the initial SSE connection.
84    ConnectingStream(ConnectFuture),
85    /// Polling an active SSE stream chunk-by-chunk.
86    StreamingChunks(Box<StreamingData>),
87    /// Yielding individual `ToolCall` events before execution starts.
88    /// `from_streaming`: if true, events were already emitted as chunks during
89    /// [`StreamingChunks`] — skip emitting and go straight to [`ExecutingTools`].
90    YieldingToolCalls {
91        pending: VecDeque<crate::raw::request::message::ToolCall>,
92        raw: Vec<crate::raw::request::message::ToolCall>,
93        from_streaming: bool,
94    },
95    /// Awaiting parallel/sequential tool execution.
96    ExecutingTools(ExecFuture),
97    /// Yielding individual `ToolResult` events after execution completes.
98    YieldingToolResults { pending: VecDeque<ToolCallResult> },
99    /// Terminal state — the stream will never produce another item.
100    Done,
101}
102
103// ── Constructor / accessor ────────────────────────────────────────────────────
104
105impl AgentStream {
106    /// Wrap an agent and start in the `Idle` state.
107    pub fn new(agent: DeepseekAgent) -> Self {
108        Self {
109            agent: Some(agent),
110            state: AgentStreamState::Idle,
111            pending_events: VecDeque::new(),
112        }
113    }
114
115    /// Consume the stream and return the agent.
116    ///
117    /// If the stream finished normally (or was dropped mid-stream), the agent is
118    /// returned so callers can continue the conversation without constructing a
119    /// new one.
120    ///
121    /// Returns `None` only if the agent is currently owned by an in-progress
122    /// future (i.e. the stream was dropped mid-poll, which is very unusual).
123    pub fn into_agent(self) -> Option<DeepseekAgent> {
124        match self.state {
125            AgentStreamState::StreamingChunks(data) => Some(data.agent),
126            _ => self.agent,
127        }
128    }
129}
130
131// ── Stream implementation ─────────────────────────────────────────────────────
132
133impl Stream for AgentStream {
134    type Item = Result<AgentEvent, ApiError>;
135
136    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137        let this = self.get_mut();
138
139        loop {
140            // Drain any events queued by a previous iteration before polling state.
141            if let Some(ev) = this.pending_events.pop_front() {
142                return Poll::Ready(Some(Ok(ev)));
143            }
144
145            // ── StreamingChunks is handled first to avoid borrow-checker
146            //    conflicts: we need to both poll the inner stream *and* replace
147            //    `this.state`, which requires owning the data.
148            if matches!(this.state, AgentStreamState::StreamingChunks(_)) {
149                let mut data = match std::mem::replace(&mut this.state, AgentStreamState::Done) {
150                    AgentStreamState::StreamingChunks(d) => d,
151                    _ => unreachable!(),
152                };
153
154                match data.stream.poll_next_unpin(cx) {
155                    Poll::Pending => {
156                        this.state = AgentStreamState::StreamingChunks(data);
157                        return Poll::Pending;
158                    }
159
160                    Poll::Ready(Some(Ok(chunk))) => {
161                        let mut events = apply_chunk_delta(&mut data, chunk);
162                        this.state = AgentStreamState::StreamingChunks(data);
163                        // Queue all events; drain them one per poll via pending_events.
164                        if !events.is_empty() {
165                            // Push tail events into the pending queue (they will be
166                            // returned on subsequent poll_next calls before we poll
167                            // the underlying stream again).
168                            for extra in events.drain(1..) {
169                                this.pending_events.push_back(match extra {
170                                    ChunkEvent::Token(t) => AgentEvent::Token(t),
171                                    ChunkEvent::ReasoningToken(t) => AgentEvent::ReasoningToken(t),
172                                    ChunkEvent::ToolCallChunk { id, name, delta, index } =>
173                                        AgentEvent::ToolCall(ToolCallChunk { id, name, delta, index }),
174                                });
175                            }
176                            let ev = events.swap_remove(0);
177                            return Poll::Ready(Some(Ok(match ev {
178                                ChunkEvent::Token(t) => AgentEvent::Token(t),
179                                ChunkEvent::ReasoningToken(t) => AgentEvent::ReasoningToken(t),
180                                ChunkEvent::ToolCallChunk {
181                                    id,
182                                    name,
183                                    delta,
184                                    index,
185                                } => AgentEvent::ToolCall(ToolCallChunk {
186                                    id,
187                                    name,
188                                    delta,
189                                    index,
190                                }),
191                            })));
192                        }
193                        continue;
194                    }
195
196                    Poll::Ready(Some(Err(e))) => {
197                        // Stream errored — salvage the agent and terminate.
198                        this.agent = Some(data.agent);
199                        // state stays Done (set above via mem::replace)
200                        return Poll::Ready(Some(Err(e)));
201                    }
202
203                    Poll::Ready(None) => {
204                        // SSE stream ended — assemble full tool calls from buffers.
205                        let raw_tool_calls = finalize_stream(&mut data);
206
207                        if raw_tool_calls.is_empty() {
208                            this.agent = Some(data.agent);
209                            this.state = AgentStreamState::Done;
210                            return Poll::Ready(None);
211                        }
212
213                        this.agent = Some(data.agent);
214                        this.state = AgentStreamState::YieldingToolCalls {
215                            pending: VecDeque::new(),
216                            raw: raw_tool_calls,
217                            from_streaming: true,
218                        };
219                        continue;
220                    }
221                }
222            }
223
224            // ── All other states ──────────────────────────────────────────────
225            match &mut this.state {
226                AgentStreamState::Done => return Poll::Ready(None),
227
228                AgentStreamState::Idle => {
229                    let agent = this.agent.as_mut().expect("agent missing in Idle state");
230                    agent.drain_interrupts();
231                    agent.drain_tool_injections();
232                    let agent = this.agent.take().unwrap();
233                    this.state = AgentStreamState::Summarizing(Box::pin(run_summarize(agent)));
234                }
235
236                AgentStreamState::Summarizing(fut) => match fut.as_mut().poll(cx) {
237                    Poll::Pending => return Poll::Pending,
238                    Poll::Ready(agent) => {
239                        this.state = if agent.streaming {
240                            AgentStreamState::ConnectingStream(Box::pin(connect_stream(agent)))
241                        } else {
242                            AgentStreamState::FetchingResponse(Box::pin(fetch_response(agent)))
243                        };
244                    }
245                },
246
247                AgentStreamState::FetchingResponse(fut) => match fut.as_mut().poll(cx) {
248                    Poll::Pending => return Poll::Pending,
249                    Poll::Ready((Err(e), agent)) => {
250                        this.agent = Some(agent);
251                        this.state = AgentStreamState::Done;
252                        return Poll::Ready(Some(Err(e)));
253                    }
254                    Poll::Ready((Ok(fetch), agent)) => {
255                        this.agent = Some(agent);
256
257                        if fetch.raw_tool_calls.is_empty() {
258                            this.state = AgentStreamState::Done;
259                            // Queue both events (reasoning then content) so neither is lost.
260                            if let Some(reasoning) = fetch.reasoning_content {
261                                this.pending_events
262                                    .push_back(AgentEvent::ReasoningToken(reasoning));
263                            }
264                            if let Some(text) = fetch.content {
265                                this.pending_events.push_back(AgentEvent::Token(text));
266                            }
267                            // The pending_events drain at the top of the loop will emit them.
268                            continue;
269                        }
270
271                        // Yield any text content before transitioning.
272                        let maybe_text = fetch.content.map(AgentEvent::Token);
273                        let pending = fetch
274                            .raw_tool_calls
275                            .iter()
276                            .cloned()
277                            .collect::<VecDeque<_>>();
278                        this.state = AgentStreamState::YieldingToolCalls {
279                            pending,
280                            raw: fetch.raw_tool_calls,
281                            from_streaming: false,
282                        };
283
284                        if let Some(event) = maybe_text {
285                            return Poll::Ready(Some(Ok(event)));
286                        }
287                        continue;
288                    }
289                },
290
291                AgentStreamState::ConnectingStream(fut) => match fut.as_mut().poll(cx) {
292                    Poll::Pending => return Poll::Pending,
293                    Poll::Ready((Err(e), agent)) => {
294                        this.agent = Some(agent);
295                        this.state = AgentStreamState::Done;
296                        return Poll::Ready(Some(Err(e)));
297                    }
298                    Poll::Ready((Ok(stream), agent)) => {
299                        this.state = AgentStreamState::StreamingChunks(Box::new(StreamingData {
300                            stream,
301                            agent,
302                            content_buf: String::new(),
303                            reasoning_buf: String::new(),
304                            tool_call_bufs: Vec::new(),
305                        }));
306                        // Loop back to hit the StreamingChunks branch.
307                    }
308                },
309
310                AgentStreamState::YieldingToolCalls {
311                    pending,
312                    raw,
313                    from_streaming,
314                } => {
315                    if !*from_streaming && let Some(tc) = pending.pop_front() {
316                        return Poll::Ready(Some(Ok(AgentEvent::ToolCall(ToolCallChunk {
317                            id: tc.id.clone(),
318                            name: tc.function.name.clone(),
319                            delta: tc.function.arguments.clone(),
320                            index: 0,
321                        }))));
322                    }
323                    // All events yielded (or streaming — already emitted as chunks).
324                    let agent = this
325                        .agent
326                        .take()
327                        .expect("agent missing in YieldingToolCalls");
328                    let raw_calls = std::mem::take(raw);
329                    this.state =
330                        AgentStreamState::ExecutingTools(Box::pin(execute_tools(agent, raw_calls)));
331                }
332
333                AgentStreamState::ExecutingTools(fut) => match fut.as_mut().poll(cx) {
334                    Poll::Pending => return Poll::Pending,
335                    Poll::Ready((tools_result, agent)) => {
336                        this.agent = Some(agent);
337                        this.state = AgentStreamState::YieldingToolResults {
338                            pending: tools_result.results.into_iter().collect(),
339                        };
340                    }
341                },
342
343                AgentStreamState::YieldingToolResults { pending } => {
344                    if let Some(result) = pending.pop_front() {
345                        return Poll::Ready(Some(Ok(AgentEvent::ToolResult(result))));
346                    }
347                    // All results delivered — loop back for the next API turn.
348                    this.state = AgentStreamState::Idle;
349                }
350
351                // Handled in the dedicated block above; this arm is unreachable
352                // but the compiler cannot verify that without exhaustiveness help.
353                AgentStreamState::StreamingChunks(_) => unreachable!(),
354            }
355        }
356    }
357}