1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures::Stream;
5use serde_json::Value;
6
7use crate::agent::agent_core::{AgentResponse, DeepseekAgent, ToolCallEvent};
8use crate::conversation::Conversation;
9use crate::raw::request::message::{Message, Role, ToolCall};
10
11struct FetchResult {
13 content: Option<String>,
14 raw_tool_calls: Vec<ToolCall>,
15}
16
17struct ToolsResult {
19 events: Vec<ToolCallEvent>,
20}
21
22pub struct AgentStream {
24 agent: Option<DeepseekAgent>,
25 state: AgentStreamState,
26}
27
28enum AgentStreamState {
29 Idle,
30 FetchingResponse(
32 Pin<Box<dyn std::future::Future<Output = (Option<FetchResult>, DeepseekAgent)> + Send>>,
33 ),
34 ExecutingTools(Pin<Box<dyn std::future::Future<Output = (ToolsResult, DeepseekAgent)> + Send>>),
36 Done,
37}
38
39impl AgentStream {
40 pub fn new(agent: DeepseekAgent) -> Self {
41 Self {
42 agent: Some(agent),
43 state: AgentStreamState::Idle,
44 }
45 }
46
47 pub fn into_agent(self) -> Option<DeepseekAgent> {
48 self.agent
49 }
50}
51
52impl Stream for AgentStream {
53 type Item = AgentResponse;
54
55 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56 let this = self.get_mut();
57
58 loop {
59 match &mut this.state {
60 AgentStreamState::Done => return Poll::Ready(None),
61
62 AgentStreamState::Idle => {
63 let agent = this.agent.take().expect("agent missing");
64 let fut = Box::pin(fetch_response(agent));
65 this.state = AgentStreamState::FetchingResponse(fut);
66 }
67
68 AgentStreamState::FetchingResponse(fut) => {
69 match fut.as_mut().poll(cx) {
70 Poll::Pending => return Poll::Pending,
71 Poll::Ready((None, agent)) => {
72 this.agent = Some(agent);
73 this.state = AgentStreamState::Done;
74 return Poll::Ready(None);
75 }
76 Poll::Ready((Some(fetch), agent)) => {
77 if fetch.raw_tool_calls.is_empty() {
78 this.agent = Some(agent);
80 this.state = AgentStreamState::Done;
81 return Poll::Ready(Some(AgentResponse {
82 content: fetch.content,
83 tool_calls: vec![],
84 }));
85 } else {
86 let content = fetch.content.clone();
90
91 let raw_calls_owned = fetch.raw_tool_calls;
93
94 let preview_events: Vec<ToolCallEvent> = raw_calls_owned
96 .iter()
97 .map(|tc| ToolCallEvent {
98 id: tc.id.clone(),
99 name: tc.function.name.clone(),
100 args: serde_json::from_str(&tc.function.arguments)
101 .unwrap_or(serde_json::Value::Null),
102 result: serde_json::Value::Null,
103 })
104 .collect();
105
106 let exec_calls = raw_calls_owned.clone();
108
109 let fut = Box::pin(execute_tools(agent, exec_calls));
110 this.state = AgentStreamState::ExecutingTools(fut);
111 return Poll::Ready(Some(AgentResponse {
112 content,
113 tool_calls: preview_events,
114 }));
115 }
116 }
117 }
118 }
119
120 AgentStreamState::ExecutingTools(fut) => {
121 match fut.as_mut().poll(cx) {
122 Poll::Pending => return Poll::Pending,
123 Poll::Ready((results, agent)) => {
124 this.agent = Some(agent);
125 this.state = AgentStreamState::Idle;
127 return Poll::Ready(Some(AgentResponse {
128 content: None,
129 tool_calls: results.events,
130 }));
131 }
132 }
133 }
134 }
135 }
136 }
137}
138
139async fn fetch_response(mut agent: DeepseekAgent) -> (Option<FetchResult>, DeepseekAgent) {
141 let history = agent.conversation.history().clone();
143 let mut req = crate::api::ApiRequest::builder().messages(history);
144
145 for tool in &agent.tools {
147 for raw in tool.raw_tools() {
148 req = req.add_tool(raw);
149 }
150 }
151
152 if !agent.tools.is_empty() {
153 req = req.tool_choice_auto();
154 }
155
156 let resp = match agent.client.send(req).await {
158 Ok(r) => r,
159 Err(_) => return (None, agent),
160 };
161
162 let choice = match resp.choices.into_iter().next() {
163 Some(c) => c,
164 None => return (None, agent),
165 };
166
167 let assistant_msg = choice.message;
168 let content = assistant_msg.content.clone();
169 let raw_tool_calls = assistant_msg.tool_calls.clone().unwrap_or_default();
170
171 agent.conversation.history_mut().push(assistant_msg);
173
174 (
175 Some(FetchResult {
176 content,
177 raw_tool_calls,
178 }),
179 agent,
180 )
181}
182
183async fn execute_tools(
185 mut agent: DeepseekAgent,
186 raw_tool_calls: Vec<ToolCall>,
187) -> (ToolsResult, DeepseekAgent) {
188 let mut events = vec![];
189
190 for tc in raw_tool_calls {
191 let args: Value = serde_json::from_str(&tc.function.arguments).unwrap_or(Value::Null);
192
193 let result = match agent.tool_index.get(&tc.function.name) {
194 Some(&idx) => agent.tools[idx].call(&tc.function.name, args.clone()).await,
195 None => serde_json::json!({ "error": format!("unknown tool: {}", tc.function.name) }),
196 };
197
198 agent.conversation.history_mut().push(Message {
200 role: Role::Tool,
201 content: Some(result.to_string()),
202 tool_call_id: Some(tc.id.clone()),
203 ..Default::default()
204 });
205
206 events.push(ToolCallEvent {
207 id: tc.id,
208 name: tc.function.name,
209 args,
210 result,
211 });
212 }
213
214 (ToolsResult { events }, agent)
215}