1use crate::model::{
11 Choice, CompletionMeta, Delta, Message, MessageBuilder, Model, Request, Response, Role, Tool,
12 Usage,
13};
14use anyhow::Result;
15use async_stream::stream;
16pub use builder::AgentBuilder;
17pub use config::AgentConfig;
18use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
19use futures_core::Stream;
20use futures_util::StreamExt;
21use tokio::sync::{mpsc, oneshot};
22pub use tool::{AsTool, ToolDescription, ToolRequest, ToolSender};
23
24mod builder;
25mod compact;
26pub mod config;
27pub mod event;
28pub mod tool;
29
30fn last_sender(history: &[Message]) -> String {
32 history
33 .iter()
34 .rev()
35 .find(|m| m.role == Role::User)
36 .map(|m| m.sender.clone())
37 .unwrap_or_default()
38}
39
40pub struct Agent<M: Model> {
48 pub config: AgentConfig,
50 model: M,
52 tools: Vec<Tool>,
54 tool_tx: Option<ToolSender>,
56}
57
58impl<M: Model> Agent<M> {
59 fn build_request(&self, history: &[Message]) -> Request {
61 let model_name = self
62 .config
63 .model
64 .clone()
65 .unwrap_or_else(|| self.model.active_model());
66
67 let mut messages = Vec::with_capacity(1 + history.len());
68 if !self.config.system_prompt.is_empty() {
69 messages.push(Message::system(&self.config.system_prompt));
70 }
71 messages.extend(history.iter().cloned());
72
73 let mut request = Request::new(model_name)
74 .with_messages(messages)
75 .with_tool_choice(self.config.tool_choice.clone())
76 .with_think(self.config.thinking);
77 if !self.tools.is_empty() {
78 request = request.with_tools(self.tools.clone());
79 }
80 request
81 }
82
83 pub async fn step(
89 &self,
90 history: &mut Vec<Message>,
91 session_id: Option<u64>,
92 ) -> Result<AgentStep> {
93 let request = self.build_request(history);
94 let response = self.model.send(&request).await?;
95 let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
96
97 if let Some(msg) = response.message() {
98 history.push(msg);
99 }
100
101 let mut tool_results = Vec::new();
102 if !tool_calls.is_empty() {
103 let sender = last_sender(history);
104 for tc in &tool_calls {
105 let result = self
106 .dispatch_tool(
107 &tc.function.name,
108 &tc.function.arguments,
109 &sender,
110 session_id,
111 )
112 .await;
113 let msg = Message::tool(&result, tc.id.clone());
114 history.push(msg.clone());
115 tool_results.push(msg);
116 }
117 }
118
119 Ok(AgentStep {
120 response,
121 tool_calls,
122 tool_results,
123 })
124 }
125
126 async fn dispatch_tool(
131 &self,
132 name: &str,
133 args: &str,
134 sender: &str,
135 session_id: Option<u64>,
136 ) -> String {
137 let Some(tx) = &self.tool_tx else {
138 return format!("tool '{name}' called but no tool sender configured");
139 };
140 let (reply_tx, reply_rx) = oneshot::channel();
141 let req = ToolRequest {
142 name: name.to_owned(),
143 args: args.to_owned(),
144 agent: self.config.name.to_string(),
145 reply: reply_tx,
146 task_id: None,
147 sender: sender.into(),
148 session_id,
149 };
150 if tx.send(req).is_err() {
151 return format!("tool channel closed while calling '{name}'");
152 }
153 reply_rx
154 .await
155 .unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
156 }
157
158 fn stop_reason(step: &AgentStep) -> AgentStopReason {
160 if step.response.content().is_some() {
161 AgentStopReason::TextResponse
162 } else {
163 AgentStopReason::NoAction
164 }
165 }
166
167 pub async fn run(
172 &self,
173 history: &mut Vec<Message>,
174 events: mpsc::UnboundedSender<AgentEvent>,
175 session_id: Option<u64>,
176 ) -> AgentResponse {
177 let mut stream = std::pin::pin!(self.run_stream(history, session_id));
178 let mut response = None;
179 while let Some(event) = stream.next().await {
180 if let AgentEvent::Done(ref resp) = event {
181 response = Some(resp.clone());
182 }
183 let _ = events.send(event);
184 }
185
186 response.unwrap_or_else(|| AgentResponse {
187 final_response: None,
188 iterations: 0,
189 stop_reason: AgentStopReason::Error("stream ended without Done".into()),
190 steps: vec![],
191 })
192 }
193
194 pub fn run_stream<'a>(
200 &'a self,
201 history: &'a mut Vec<Message>,
202 session_id: Option<u64>,
203 ) -> impl Stream<Item = AgentEvent> + 'a {
204 stream! {
205 let mut steps = Vec::new();
206 let max = self.config.max_iterations;
207
208 for _ in 0..max {
209 let request = self.build_request(history);
210
211 let mut builder = MessageBuilder::new(Role::Assistant);
213 let mut finish_reason = None;
214 let mut last_meta = CompletionMeta::default();
215 let mut last_usage = None;
216 let mut stream_error = None;
217 let mut tool_begin_emitted = false;
218
219 {
220 let mut chunk_stream = std::pin::pin!(self.model.stream(request));
221 while let Some(result) = chunk_stream.next().await {
222 match result {
223 Ok(chunk) => {
224 if let Some(text) = chunk.content() {
225 yield AgentEvent::TextDelta(text.to_owned());
226 }
227 if let Some(reason) = chunk.reasoning_content() {
228 yield AgentEvent::ThinkingDelta(reason.to_owned());
229 }
230 if let Some(r) = chunk.reason() {
231 finish_reason = Some(r.clone());
232 }
233 last_meta = chunk.meta.clone();
234 if chunk.usage.is_some() {
235 last_usage = chunk.usage.clone();
236 }
237 builder.accept(&chunk);
238 if !tool_begin_emitted {
243 let calls = builder.peek_tool_calls();
244 if !calls.is_empty() {
245 tool_begin_emitted = true;
246 yield AgentEvent::ToolCallsBegin(calls);
247 }
248 }
249 }
250 Err(e) => {
251 stream_error = Some(e.to_string());
252 break;
253 }
254 }
255 }
256 }
257 if let Some(e) = stream_error {
258 yield AgentEvent::Done(AgentResponse {
259 final_response: None,
260 iterations: steps.len(),
261 stop_reason: AgentStopReason::Error(e),
262 steps,
263 });
264 return;
265 }
266
267 let msg = builder.build();
269 let tool_calls = msg.tool_calls.to_vec();
270 let content = if msg.content.is_empty() {
271 None
272 } else {
273 Some(msg.content.clone())
274 };
275
276 let response = Response {
277 meta: last_meta,
278 choices: vec![Choice {
279 index: 0,
280 delta: Delta {
281 role: Some(Role::Assistant),
282 content: content.clone(),
283 reasoning_content: if msg.reasoning_content.is_empty() {
284 None
285 } else {
286 Some(msg.reasoning_content.clone())
287 },
288 tool_calls: if tool_calls.is_empty() {
289 None
290 } else {
291 Some(tool_calls.clone())
292 },
293 },
294 finish_reason,
295 logprobs: None,
296 }],
297 usage: last_usage.unwrap_or(Usage {
298 prompt_tokens: 0,
299 completion_tokens: 0,
300 total_tokens: 0,
301 prompt_cache_hit_tokens: None,
302 prompt_cache_miss_tokens: None,
303 completion_tokens_details: None,
304 }),
305 };
306
307 history.push(msg);
308 let has_tool_calls = !tool_calls.is_empty();
309
310 let mut tool_results = Vec::new();
314 if has_tool_calls {
315 let sender = last_sender(history);
316 yield AgentEvent::ToolCallsStart(tool_calls.clone());
317 for tc in &tool_calls {
318 let tool_start = std::time::Instant::now();
319 let result = self
320 .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender, session_id)
321 .await;
322 let duration_ms = tool_start.elapsed().as_millis() as u64;
323 let msg = Message::tool(&result, tc.id.clone());
324 history.push(msg.clone());
325 tool_results.push(msg);
326 yield AgentEvent::ToolResult {
327 call_id: tc.id.clone(),
328 output: result,
329 duration_ms,
330 };
331 }
332 yield AgentEvent::ToolCallsComplete;
333 }
334
335 if let Some(threshold) = self.config.compact_threshold
337 && Self::estimate_tokens(history) > threshold
338 {
339 if let Some(summary) = self.compact(history).await {
340 yield AgentEvent::Compact { summary: summary.clone() };
341 *history = vec![Message::user(&summary)];
342 yield AgentEvent::TextDelta(
343 "\n[context compacted]\n".to_owned(),
344 );
345 }
346 continue;
347 }
348
349 let step = AgentStep {
350 response,
351 tool_calls,
352 tool_results,
353 };
354
355 if !has_tool_calls {
356 let stop_reason = Self::stop_reason(&step);
357 steps.push(step);
358 yield AgentEvent::Done(AgentResponse {
359 final_response: content,
360 iterations: steps.len(),
361 stop_reason,
362 steps,
363 });
364 return;
365 }
366
367 steps.push(step);
368 }
369
370 let final_response = steps.last().and_then(|s| s.response.content().cloned());
371 yield AgentEvent::Done(AgentResponse {
372 final_response,
373 iterations: steps.len(),
374 stop_reason: AgentStopReason::MaxIterations,
375 steps,
376 });
377 }
378 }
379}