Skip to main content

ds_api/agent/
stream.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures::Stream;
5use serde_json::Value;
6
7use crate::agent::agent_core::{AgentResponse, DeepseekAgent, ToolCallEvent};
8use crate::conversation::Conversation;
9use crate::raw::request::message::{Message, Role, ToolCall};
10
11/// API call result (internal)
12struct FetchResult {
13    content: Option<String>,
14    raw_tool_calls: Vec<ToolCall>,
15}
16
17// Tools execution result (internal)
18struct ToolsResult {
19    events: Vec<ToolCallEvent>,
20}
21
22/// AgentStream: async driver that advances in phases (fetch -> yield content -> execute tools -> yield tool events)
23pub struct AgentStream {
24    agent: Option<DeepseekAgent>,
25    state: AgentStreamState,
26}
27
28enum AgentStreamState {
29    Idle,
30    // Waiting for API response
31    FetchingResponse(
32        Pin<Box<dyn std::future::Future<Output = (Option<FetchResult>, DeepseekAgent)> + Send>>,
33    ),
34    // Content has been yielded; executing tools
35    ExecutingTools(Pin<Box<dyn std::future::Future<Output = (ToolsResult, DeepseekAgent)> + Send>>),
36    Done,
37}
38
39impl AgentStream {
40    pub fn new(agent: DeepseekAgent) -> Self {
41        Self {
42            agent: Some(agent),
43            state: AgentStreamState::Idle,
44        }
45    }
46
47    pub fn into_agent(self) -> Option<DeepseekAgent> {
48        self.agent
49    }
50}
51
52impl Stream for AgentStream {
53    type Item = AgentResponse;
54
55    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56        let this = self.get_mut();
57
58        loop {
59            match &mut this.state {
60                AgentStreamState::Done => return Poll::Ready(None),
61
62                AgentStreamState::Idle => {
63                    let agent = this.agent.take().expect("agent missing");
64                    let fut = Box::pin(fetch_response(agent));
65                    this.state = AgentStreamState::FetchingResponse(fut);
66                }
67
68                AgentStreamState::FetchingResponse(fut) => {
69                    match fut.as_mut().poll(cx) {
70                        Poll::Pending => return Poll::Pending,
71                        Poll::Ready((None, agent)) => {
72                            this.agent = Some(agent);
73                            this.state = AgentStreamState::Done;
74                            return Poll::Ready(None);
75                        }
76                        Poll::Ready((Some(fetch), agent)) => {
77                            if fetch.raw_tool_calls.is_empty() {
78                                // No tool calls: finish and return content
79                                this.agent = Some(agent);
80                                this.state = AgentStreamState::Done;
81                                return Poll::Ready(Some(AgentResponse {
82                                    content: fetch.content,
83                                    tool_calls: vec![],
84                                }));
85                            } else {
86                                // There are tool calls:
87                                // We want the first yield to return content + tool call requests (preview),
88                                // and the second yield to return the tool execution results.
89                                let content = fetch.content.clone();
90
91                                // fetch.raw_tool_calls is owned here; take it for preview and clone for execution
92                                let raw_calls_owned = fetch.raw_tool_calls;
93
94                                // build preview events: same id/name/args but result = null
95                                let preview_events: Vec<ToolCallEvent> = raw_calls_owned
96                                    .iter()
97                                    .map(|tc| ToolCallEvent {
98                                        id: tc.id.clone(),
99                                        name: tc.function.name.clone(),
100                                        args: serde_json::from_str(&tc.function.arguments)
101                                            .unwrap_or(serde_json::Value::Null),
102                                        result: serde_json::Value::Null,
103                                    })
104                                    .collect();
105
106                                // clone raw calls for execution
107                                let exec_calls = raw_calls_owned.clone();
108
109                                let fut = Box::pin(execute_tools(agent, exec_calls));
110                                this.state = AgentStreamState::ExecutingTools(fut);
111                                return Poll::Ready(Some(AgentResponse {
112                                    content,
113                                    tool_calls: preview_events,
114                                }));
115                            }
116                        }
117                    }
118                }
119
120                AgentStreamState::ExecutingTools(fut) => {
121                    match fut.as_mut().poll(cx) {
122                        Poll::Pending => return Poll::Pending,
123                        Poll::Ready((results, agent)) => {
124                            this.agent = Some(agent);
125                            // Tools finished executing: yield results, then return to Idle for the next round
126                            this.state = AgentStreamState::Idle;
127                            return Poll::Ready(Some(AgentResponse {
128                                content: None,
129                                tool_calls: results.events,
130                            }));
131                        }
132                    }
133                }
134            }
135        }
136    }
137}
138
139/// Send an API request from the agent and return FetchResult (contains assistant text and potential raw tool calls).
140async fn fetch_response(mut agent: DeepseekAgent) -> (Option<FetchResult>, DeepseekAgent) {
141    // Build the request using the conversation history
142    let history = agent.conversation.history().clone();
143    let mut req = crate::api::ApiRequest::builder().messages(history);
144
145    // Attach tools (raw definitions) to the request
146    for tool in &agent.tools {
147        for raw in tool.raw_tools() {
148            req = req.add_tool(raw);
149        }
150    }
151
152    if !agent.tools.is_empty() {
153        req = req.tool_choice_auto();
154    }
155
156    // Send the request using the ApiClient owned by the agent
157    let resp = match agent.client.send(req).await {
158        Ok(r) => r,
159        Err(_) => return (None, agent),
160    };
161
162    let choice = match resp.choices.into_iter().next() {
163        Some(c) => c,
164        None => return (None, agent),
165    };
166
167    let assistant_msg = choice.message;
168    let content = assistant_msg.content.clone();
169    let raw_tool_calls = assistant_msg.tool_calls.clone().unwrap_or_default();
170
171    // Add the assistant message into the conversation history
172    agent.conversation.history_mut().push(assistant_msg);
173
174    (
175        Some(FetchResult {
176            content,
177            raw_tool_calls,
178        }),
179        agent,
180    )
181}
182
183/// Execute tool calls, write tool results back into the conversation history, and return a list of events
184async fn execute_tools(
185    mut agent: DeepseekAgent,
186    raw_tool_calls: Vec<ToolCall>,
187) -> (ToolsResult, DeepseekAgent) {
188    let mut events = vec![];
189
190    for tc in raw_tool_calls {
191        let args: Value = serde_json::from_str(&tc.function.arguments).unwrap_or(Value::Null);
192
193        let result = match agent.tool_index.get(&tc.function.name) {
194            Some(&idx) => agent.tools[idx].call(&tc.function.name, args.clone()).await,
195            None => serde_json::json!({ "error": format!("unknown tool: {}", tc.function.name) }),
196        };
197
198        // Push the tool's returned result as a tool-role message into the conversation history (to aid subsequent dialog)
199        agent.conversation.history_mut().push(Message {
200            role: Role::Tool,
201            content: Some(result.to_string()),
202            tool_call_id: Some(tc.id.clone()),
203            ..Default::default()
204        });
205
206        events.push(ToolCallEvent {
207            id: tc.id,
208            name: tc.function.name,
209            args,
210            result,
211        });
212    }
213
214    (ToolsResult { events }, agent)
215}