1use crate::model::{
11 Choice, CompletionMeta, Delta, Message, MessageBuilder, Model, Request, Response, Role, Tool,
12 Usage,
13};
14use anyhow::Result;
15use async_stream::stream;
16pub use builder::AgentBuilder;
17pub use config::AgentConfig;
18use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
19use futures_core::Stream;
20use futures_util::StreamExt;
21use tokio::sync::{mpsc, oneshot};
22pub use tool::{AsTool, ToolDescription, ToolRequest, ToolSender};
23
24mod builder;
25mod compact;
26pub mod config;
27pub mod event;
28pub mod tool;
29
30fn last_sender(history: &[Message]) -> String {
32 history
33 .iter()
34 .rev()
35 .find(|m| m.role == Role::User)
36 .map(|m| m.sender.clone())
37 .unwrap_or_default()
38}
39
40pub struct Agent<M: Model> {
48 pub config: AgentConfig,
50 model: M,
52 tools: Vec<Tool>,
54 tool_tx: Option<ToolSender>,
56}
57
58impl<M: Model> Agent<M> {
59 fn build_request(&self, history: &[Message]) -> Request {
61 let model_name = self
62 .config
63 .model
64 .clone()
65 .unwrap_or_else(|| self.model.active_model());
66
67 let mut messages = Vec::with_capacity(1 + history.len());
68 if !self.config.system_prompt.is_empty() {
69 messages.push(Message::system(&self.config.system_prompt));
70 }
71 messages.extend(history.iter().cloned());
72
73 let mut request = Request::new(model_name)
74 .with_messages(messages)
75 .with_tool_choice(self.config.tool_choice.clone())
76 .with_think(self.config.thinking);
77 if !self.tools.is_empty() {
78 request = request.with_tools(self.tools.clone());
79 }
80 request
81 }
82
83 pub async fn step(
89 &self,
90 history: &mut Vec<Message>,
91 session_id: Option<u64>,
92 ) -> Result<AgentStep> {
93 let request = self.build_request(history);
94 let response = self.model.send(&request).await?;
95 let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
96
97 if let Some(msg) = response.message() {
98 history.push(msg);
99 }
100
101 let mut tool_results = Vec::new();
102 if !tool_calls.is_empty() {
103 let sender = last_sender(history);
104 for tc in &tool_calls {
105 let result = self
106 .dispatch_tool(
107 &tc.function.name,
108 &tc.function.arguments,
109 &sender,
110 session_id,
111 )
112 .await;
113 let msg = Message::tool(&result, tc.id.clone());
114 history.push(msg.clone());
115 tool_results.push(msg);
116 }
117 }
118
119 Ok(AgentStep {
120 response,
121 tool_calls,
122 tool_results,
123 })
124 }
125
126 async fn dispatch_tool(
131 &self,
132 name: &str,
133 args: &str,
134 sender: &str,
135 session_id: Option<u64>,
136 ) -> String {
137 let Some(tx) = &self.tool_tx else {
138 return format!("tool '{name}' called but no tool sender configured");
139 };
140 let (reply_tx, reply_rx) = oneshot::channel();
141 let req = ToolRequest {
142 name: name.to_owned(),
143 args: args.to_owned(),
144 agent: self.config.name.to_string(),
145 reply: reply_tx,
146 task_id: None,
147 sender: sender.into(),
148 session_id,
149 };
150 if tx.send(req).is_err() {
151 return format!("tool channel closed while calling '{name}'");
152 }
153 reply_rx
154 .await
155 .unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
156 }
157
158 fn stop_reason(step: &AgentStep) -> AgentStopReason {
160 if step.response.content().is_some() {
161 AgentStopReason::TextResponse
162 } else {
163 AgentStopReason::NoAction
164 }
165 }
166
167 pub async fn run(
172 &self,
173 history: &mut Vec<Message>,
174 events: mpsc::UnboundedSender<AgentEvent>,
175 session_id: Option<u64>,
176 ) -> AgentResponse {
177 let mut stream = std::pin::pin!(self.run_stream(history, session_id));
178 let mut response = None;
179 while let Some(event) = stream.next().await {
180 if let AgentEvent::Done(ref resp) = event {
181 response = Some(resp.clone());
182 }
183 let _ = events.send(event);
184 }
185
186 response.unwrap_or_else(|| AgentResponse {
187 final_response: None,
188 iterations: 0,
189 stop_reason: AgentStopReason::Error("stream ended without Done".into()),
190 steps: vec![],
191 })
192 }
193
194 pub fn run_stream<'a>(
200 &'a self,
201 history: &'a mut Vec<Message>,
202 session_id: Option<u64>,
203 ) -> impl Stream<Item = AgentEvent> + 'a {
204 stream! {
205 let mut steps = Vec::new();
206 let max = self.config.max_iterations;
207
208 for _ in 0..max {
209 let request = self.build_request(history);
210
211 let mut builder = MessageBuilder::new(Role::Assistant);
213 let mut finish_reason = None;
214 let mut last_meta = CompletionMeta::default();
215 let mut last_usage = None;
216 let mut stream_error = None;
217
218 {
219 let mut chunk_stream = std::pin::pin!(self.model.stream(request));
220 while let Some(result) = chunk_stream.next().await {
221 match result {
222 Ok(chunk) => {
223 if let Some(text) = chunk.content() {
224 yield AgentEvent::TextDelta(text.to_owned());
225 }
226 if let Some(reason) = chunk.reasoning_content() {
227 yield AgentEvent::ThinkingDelta(reason.to_owned());
228 }
229 if let Some(r) = chunk.reason() {
230 finish_reason = Some(r.clone());
231 }
232 last_meta = chunk.meta.clone();
233 if chunk.usage.is_some() {
234 last_usage = chunk.usage.clone();
235 }
236 builder.accept(&chunk);
237 }
238 Err(e) => {
239 stream_error = Some(e.to_string());
240 break;
241 }
242 }
243 }
244 }
245 if let Some(e) = stream_error {
246 yield AgentEvent::Done(AgentResponse {
247 final_response: None,
248 iterations: steps.len(),
249 stop_reason: AgentStopReason::Error(e),
250 steps,
251 });
252 return;
253 }
254
255 let msg = builder.build();
257 let tool_calls = msg.tool_calls.to_vec();
258 let content = if msg.content.is_empty() {
259 None
260 } else {
261 Some(msg.content.clone())
262 };
263
264 let response = Response {
265 meta: last_meta,
266 choices: vec![Choice {
267 index: 0,
268 delta: Delta {
269 role: Some(Role::Assistant),
270 content: content.clone(),
271 reasoning_content: if msg.reasoning_content.is_empty() {
272 None
273 } else {
274 Some(msg.reasoning_content.clone())
275 },
276 tool_calls: if tool_calls.is_empty() {
277 None
278 } else {
279 Some(tool_calls.clone())
280 },
281 },
282 finish_reason,
283 logprobs: None,
284 }],
285 usage: last_usage.unwrap_or(Usage {
286 prompt_tokens: 0,
287 completion_tokens: 0,
288 total_tokens: 0,
289 prompt_cache_hit_tokens: None,
290 prompt_cache_miss_tokens: None,
291 completion_tokens_details: None,
292 }),
293 };
294
295 history.push(msg);
296 let has_tool_calls = !tool_calls.is_empty();
297
298 let mut tool_results = Vec::new();
300 if has_tool_calls {
301 let sender = last_sender(history);
302 yield AgentEvent::ToolCallsStart(tool_calls.clone());
303 for tc in &tool_calls {
304 let result = self
305 .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender, session_id)
306 .await;
307 let msg = Message::tool(&result, tc.id.clone());
308 history.push(msg.clone());
309 tool_results.push(msg);
310 yield AgentEvent::ToolResult {
311 call_id: tc.id.clone(),
312 output: result,
313 };
314 }
315 yield AgentEvent::ToolCallsComplete;
316 }
317
318 if let Some(threshold) = self.config.compact_threshold
320 && Self::estimate_tokens(history) > threshold
321 {
322 if let Some(summary) = self.compact(history).await {
323 yield AgentEvent::Compact { summary: summary.clone() };
324 *history = vec![Message::user(&summary)];
325 yield AgentEvent::TextDelta(
326 "\n[context compacted]\n".to_owned(),
327 );
328 }
329 continue;
330 }
331
332 let step = AgentStep {
333 response,
334 tool_calls,
335 tool_results,
336 };
337
338 if !has_tool_calls {
339 let stop_reason = Self::stop_reason(&step);
340 steps.push(step);
341 yield AgentEvent::Done(AgentResponse {
342 final_response: content,
343 iterations: steps.len(),
344 stop_reason,
345 steps,
346 });
347 return;
348 }
349
350 steps.push(step);
351 }
352
353 let final_response = steps.last().and_then(|s| s.response.content().cloned());
354 yield AgentEvent::Done(AgentResponse {
355 final_response,
356 iterations: steps.len(),
357 stop_reason: AgentStopReason::MaxIterations,
358 steps,
359 });
360 }
361 }
362}