1use crate::model::{
11 Choice, CompletionMeta, Delta, Message, MessageBuilder, Model, Request, Response, Role, Tool,
12 Usage,
13};
14use anyhow::Result;
15use async_stream::stream;
16pub use builder::AgentBuilder;
17pub use config::AgentConfig;
18use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
19use futures_core::Stream;
20use futures_util::StreamExt;
21use tokio::sync::{mpsc, oneshot};
22pub use tool::{AsTool, ToolDescription, ToolRequest, ToolSender};
23
24mod builder;
25mod compact;
26pub mod config;
27pub mod event;
28pub mod tool;
29
30fn last_sender(history: &[Message]) -> String {
32 history
33 .iter()
34 .rev()
35 .find(|m| m.role == Role::User)
36 .map(|m| m.sender.clone())
37 .unwrap_or_default()
38}
39
40pub struct Agent<M: Model> {
48 pub config: AgentConfig,
50 model: M,
52 tools: Vec<Tool>,
54 tool_tx: Option<ToolSender>,
56}
57
58impl<M: Model> Agent<M> {
59 fn build_request(&self, history: &[Message]) -> Request {
61 let model_name = self
62 .config
63 .model
64 .clone()
65 .unwrap_or_else(|| self.model.active_model());
66
67 let mut messages = Vec::with_capacity(1 + history.len());
68 if !self.config.system_prompt.is_empty() {
69 messages.push(Message::system(&self.config.system_prompt));
70 }
71 messages.extend(history.iter().cloned());
72
73 let mut request = Request::new(model_name)
74 .with_messages(messages)
75 .with_tool_choice(self.config.tool_choice.clone())
76 .with_think(self.config.thinking);
77 if !self.tools.is_empty() {
78 request = request.with_tools(self.tools.clone());
79 }
80 request
81 }
82
83 pub async fn step(&self, history: &mut Vec<Message>) -> Result<AgentStep> {
89 let request = self.build_request(history);
90 let response = self.model.send(&request).await?;
91 let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
92
93 if let Some(msg) = response.message() {
94 history.push(msg);
95 }
96
97 let mut tool_results = Vec::new();
98 if !tool_calls.is_empty() {
99 let sender = last_sender(history);
100 for tc in &tool_calls {
101 let result = self
102 .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender)
103 .await;
104 let msg = Message::tool(&result, tc.id.clone());
105 history.push(msg.clone());
106 tool_results.push(msg);
107 }
108 }
109
110 Ok(AgentStep {
111 response,
112 tool_calls,
113 tool_results,
114 })
115 }
116
117 async fn dispatch_tool(&self, name: &str, args: &str, sender: &str) -> String {
122 let Some(tx) = &self.tool_tx else {
123 return format!("tool '{name}' called but no tool sender configured");
124 };
125 let (reply_tx, reply_rx) = oneshot::channel();
126 let req = ToolRequest {
127 name: name.to_owned(),
128 args: args.to_owned(),
129 agent: self.config.name.to_string(),
130 reply: reply_tx,
131 task_id: None,
132 sender: sender.into(),
133 };
134 if tx.send(req).is_err() {
135 return format!("tool channel closed while calling '{name}'");
136 }
137 reply_rx
138 .await
139 .unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
140 }
141
142 fn stop_reason(step: &AgentStep) -> AgentStopReason {
144 if step.response.content().is_some() {
145 AgentStopReason::TextResponse
146 } else {
147 AgentStopReason::NoAction
148 }
149 }
150
151 pub async fn run(
156 &self,
157 history: &mut Vec<Message>,
158 events: mpsc::UnboundedSender<AgentEvent>,
159 ) -> AgentResponse {
160 let mut stream = std::pin::pin!(self.run_stream(history));
161 let mut response = None;
162 while let Some(event) = stream.next().await {
163 if let AgentEvent::Done(ref resp) = event {
164 response = Some(resp.clone());
165 }
166 let _ = events.send(event);
167 }
168
169 response.unwrap_or_else(|| AgentResponse {
170 final_response: None,
171 iterations: 0,
172 stop_reason: AgentStopReason::Error("stream ended without Done".into()),
173 steps: vec![],
174 })
175 }
176
177 pub fn run_stream<'a>(
183 &'a self,
184 history: &'a mut Vec<Message>,
185 ) -> impl Stream<Item = AgentEvent> + 'a {
186 stream! {
187 let mut steps = Vec::new();
188 let max = self.config.max_iterations;
189
190 for _ in 0..max {
191 let request = self.build_request(history);
192
193 let mut builder = MessageBuilder::new(Role::Assistant);
195 let mut finish_reason = None;
196 let mut last_meta = CompletionMeta::default();
197 let mut last_usage = None;
198 let mut stream_error = None;
199
200 {
201 let mut chunk_stream = std::pin::pin!(self.model.stream(request));
202 while let Some(result) = chunk_stream.next().await {
203 match result {
204 Ok(chunk) => {
205 if let Some(text) = chunk.content() {
206 yield AgentEvent::TextDelta(text.to_owned());
207 }
208 if let Some(reason) = chunk.reasoning_content() {
209 yield AgentEvent::ThinkingDelta(reason.to_owned());
210 }
211 if let Some(r) = chunk.reason() {
212 finish_reason = Some(r.clone());
213 }
214 last_meta = chunk.meta.clone();
215 if chunk.usage.is_some() {
216 last_usage = chunk.usage.clone();
217 }
218 builder.accept(&chunk);
219 }
220 Err(e) => {
221 stream_error = Some(e.to_string());
222 break;
223 }
224 }
225 }
226 }
227 if let Some(e) = stream_error {
228 yield AgentEvent::Done(AgentResponse {
229 final_response: None,
230 iterations: steps.len(),
231 stop_reason: AgentStopReason::Error(e),
232 steps,
233 });
234 return;
235 }
236
237 let msg = builder.build();
239 let tool_calls = msg.tool_calls.to_vec();
240 let content = if msg.content.is_empty() {
241 None
242 } else {
243 Some(msg.content.clone())
244 };
245
246 let response = Response {
247 meta: last_meta,
248 choices: vec![Choice {
249 index: 0,
250 delta: Delta {
251 role: Some(Role::Assistant),
252 content: content.clone(),
253 reasoning_content: if msg.reasoning_content.is_empty() {
254 None
255 } else {
256 Some(msg.reasoning_content.clone())
257 },
258 tool_calls: if tool_calls.is_empty() {
259 None
260 } else {
261 Some(tool_calls.clone())
262 },
263 },
264 finish_reason,
265 logprobs: None,
266 }],
267 usage: last_usage.unwrap_or(Usage {
268 prompt_tokens: 0,
269 completion_tokens: 0,
270 total_tokens: 0,
271 prompt_cache_hit_tokens: None,
272 prompt_cache_miss_tokens: None,
273 completion_tokens_details: None,
274 }),
275 };
276
277 history.push(msg);
278 let has_tool_calls = !tool_calls.is_empty();
279
280 let mut tool_results = Vec::new();
282 if has_tool_calls {
283 let sender = last_sender(history);
284 yield AgentEvent::ToolCallsStart(tool_calls.clone());
285 for tc in &tool_calls {
286 let result = self
287 .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender)
288 .await;
289 let msg = Message::tool(&result, tc.id.clone());
290 history.push(msg.clone());
291 tool_results.push(msg);
292 yield AgentEvent::ToolResult {
293 call_id: tc.id.clone(),
294 output: result,
295 };
296 }
297 yield AgentEvent::ToolCallsComplete;
298 }
299
300 if let Some(threshold) = self.config.compact_threshold
302 && Self::estimate_tokens(history) > threshold
303 {
304 if let Some(summary) = self.compact(history).await {
305 yield AgentEvent::Compact { summary: summary.clone() };
306 *history = vec![Message::user(&summary)];
307 yield AgentEvent::TextDelta(
308 "\n[context compacted]\n".to_owned(),
309 );
310 }
311 continue;
312 }
313
314 let step = AgentStep {
315 response,
316 tool_calls,
317 tool_results,
318 };
319
320 if !has_tool_calls {
321 let stop_reason = Self::stop_reason(&step);
322 steps.push(step);
323 yield AgentEvent::Done(AgentResponse {
324 final_response: content,
325 iterations: steps.len(),
326 stop_reason,
327 steps,
328 });
329 return;
330 }
331
332 steps.push(step);
333 }
334
335 let final_response = steps.last().and_then(|s| s.response.content().cloned());
336 yield AgentEvent::Done(AgentResponse {
337 final_response,
338 iterations: steps.len(),
339 stop_reason: AgentStopReason::MaxIterations,
340 steps,
341 });
342 }
343 }
344}