1use crate::model::{
11 Choice, CompletionMeta, Delta, Message, MessageBuilder, Model, Request, Response, Role, Tool,
12 Usage,
13};
14use anyhow::Result;
15use async_stream::stream;
16pub use builder::AgentBuilder;
17pub use compact::COMPACT_SENTINEL;
18pub use config::AgentConfig;
19use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
20use futures_core::Stream;
21use futures_util::StreamExt;
22use tokio::sync::{mpsc, oneshot};
23pub use tool::{AsTool, ToolDescription, ToolRequest, ToolSender};
24
25mod builder;
26mod compact;
27pub mod config;
28pub mod event;
29pub mod tool;
30
31fn last_sender(history: &[Message]) -> compact_str::CompactString {
33 history
34 .iter()
35 .rev()
36 .find(|m| m.role == Role::User)
37 .map(|m| m.sender.clone())
38 .unwrap_or_default()
39}
40
41pub struct Agent<M: Model> {
49 pub config: AgentConfig,
51 model: M,
53 tools: Vec<Tool>,
55 tool_tx: Option<ToolSender>,
57}
58
59impl<M: Model> Agent<M> {
60 pub async fn step(&self, history: &mut Vec<Message>) -> Result<AgentStep> {
66 let model_name = self
67 .config
68 .model
69 .clone()
70 .unwrap_or_else(|| self.model.active_model());
71
72 let mut messages = Vec::with_capacity(1 + history.len());
73 if !self.config.system_prompt.is_empty() {
74 messages.push(Message::system(&self.config.system_prompt));
75 }
76 messages.extend(history.iter().cloned());
77
78 let mut request = Request::new(model_name)
79 .with_messages(messages)
80 .with_tool_choice(self.config.tool_choice.clone())
81 .with_think(self.config.thinking);
82 if !self.tools.is_empty() {
83 request = request.with_tools(self.tools.clone());
84 }
85
86 let response = self.model.send(&request).await?;
87 let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
88
89 if let Some(msg) = response.message() {
90 history.push(msg);
91 }
92
93 let mut tool_results = Vec::new();
94 if !tool_calls.is_empty() {
95 let sender = last_sender(history);
96 for tc in &tool_calls {
97 let result = self
98 .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender)
99 .await;
100 let msg = Message::tool(&result, tc.id.clone());
101 history.push(msg.clone());
102 tool_results.push(msg);
103 }
104 }
105
106 Ok(AgentStep {
107 response,
108 tool_calls,
109 tool_results,
110 })
111 }
112
113 async fn dispatch_tool(&self, name: &str, args: &str, sender: &str) -> String {
118 let Some(tx) = &self.tool_tx else {
119 return format!("tool '{name}' called but no tool sender configured");
120 };
121 let (reply_tx, reply_rx) = oneshot::channel();
122 let req = ToolRequest {
123 name: name.to_owned(),
124 args: args.to_owned(),
125 agent: self.config.name.to_string(),
126 reply: reply_tx,
127 task_id: None,
128 sender: sender.into(),
129 };
130 if tx.send(req).is_err() {
131 return format!("tool channel closed while calling '{name}'");
132 }
133 reply_rx
134 .await
135 .unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
136 }
137
138 fn stop_reason(step: &AgentStep) -> AgentStopReason {
140 if step.response.content().is_some() {
141 AgentStopReason::TextResponse
142 } else {
143 AgentStopReason::NoAction
144 }
145 }
146
147 pub async fn run(
152 &self,
153 history: &mut Vec<Message>,
154 events: mpsc::UnboundedSender<AgentEvent>,
155 ) -> AgentResponse {
156 let mut stream = std::pin::pin!(self.run_stream(history));
157 let mut response = None;
158 while let Some(event) = stream.next().await {
159 if let AgentEvent::Done(ref resp) = event {
160 response = Some(resp.clone());
161 }
162 let _ = events.send(event);
163 }
164
165 response.unwrap_or_else(|| AgentResponse {
166 final_response: None,
167 iterations: 0,
168 stop_reason: AgentStopReason::Error("stream ended without Done".into()),
169 steps: vec![],
170 })
171 }
172
173 pub fn run_stream<'a>(
179 &'a self,
180 history: &'a mut Vec<Message>,
181 ) -> impl Stream<Item = AgentEvent> + 'a {
182 stream! {
183 let mut steps = Vec::new();
184 let max = self.config.max_iterations;
185
186 for _ in 0..max {
187 let model_name = self
189 .config
190 .model
191 .clone()
192 .unwrap_or_else(|| self.model.active_model());
193
194 let mut messages = Vec::with_capacity(1 + history.len());
195 if !self.config.system_prompt.is_empty() {
196 messages.push(Message::system(&self.config.system_prompt));
197 }
198 messages.extend(history.iter().cloned());
199
200 let mut request = Request::new(model_name)
201 .with_messages(messages)
202 .with_tool_choice(self.config.tool_choice.clone())
203 .with_think(self.config.thinking);
204 if !self.tools.is_empty() {
205 request = request.with_tools(self.tools.clone());
206 }
207
208 let mut builder = MessageBuilder::new(Role::Assistant);
210 let mut finish_reason = None;
211 let mut last_meta = CompletionMeta::default();
212 let mut last_usage = None;
213 let mut stream_error = None;
214
215 {
216 let mut chunk_stream = std::pin::pin!(self.model.stream(request));
217 while let Some(result) = chunk_stream.next().await {
218 match result {
219 Ok(chunk) => {
220 if let Some(text) = chunk.content() {
221 yield AgentEvent::TextDelta(text.to_owned());
222 }
223 if let Some(reason) = chunk.reasoning_content() {
224 yield AgentEvent::ThinkingDelta(reason.to_owned());
225 }
226 if let Some(r) = chunk.reason() {
227 finish_reason = Some(*r);
228 }
229 last_meta = chunk.meta.clone();
230 if chunk.usage.is_some() {
231 last_usage = chunk.usage.clone();
232 }
233 builder.accept(&chunk);
234 }
235 Err(e) => {
236 stream_error = Some(e.to_string());
237 break;
238 }
239 }
240 }
241 }
242 if let Some(e) = stream_error {
243 yield AgentEvent::Done(AgentResponse {
244 final_response: None,
245 iterations: steps.len(),
246 stop_reason: AgentStopReason::Error(e),
247 steps,
248 });
249 return;
250 }
251
252 let msg = builder.build();
254 let tool_calls = msg.tool_calls.to_vec();
255 let content = if msg.content.is_empty() {
256 None
257 } else {
258 Some(msg.content.clone())
259 };
260
261 let response = Response {
262 meta: last_meta,
263 choices: vec![Choice {
264 index: 0,
265 delta: Delta {
266 role: Some(Role::Assistant),
267 content: content.clone(),
268 reasoning_content: if msg.reasoning_content.is_empty() {
269 None
270 } else {
271 Some(msg.reasoning_content.clone())
272 },
273 tool_calls: if tool_calls.is_empty() {
274 None
275 } else {
276 Some(tool_calls.clone())
277 },
278 },
279 finish_reason,
280 logprobs: None,
281 }],
282 usage: last_usage.unwrap_or(Usage {
283 prompt_tokens: 0,
284 completion_tokens: 0,
285 total_tokens: 0,
286 prompt_cache_hit_tokens: None,
287 prompt_cache_miss_tokens: None,
288 completion_tokens_details: None,
289 }),
290 };
291
292 history.push(msg);
293 let has_tool_calls = !tool_calls.is_empty();
294
295 let mut tool_results = Vec::new();
297 let mut compact_triggered = false;
298 if has_tool_calls {
299 let sender = last_sender(history);
300 yield AgentEvent::ToolCallsStart(tool_calls.clone());
301 for tc in &tool_calls {
302 let result = self
303 .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender)
304 .await;
305 if result.starts_with(compact::COMPACT_SENTINEL) {
306 compact_triggered = true;
307 }
308 let msg = Message::tool(&result, tc.id.clone());
309 history.push(msg.clone());
310 tool_results.push(msg);
311 yield AgentEvent::ToolResult {
312 call_id: tc.id.clone(),
313 output: result,
314 };
315 }
316 yield AgentEvent::ToolCallsComplete;
317 }
318
319 if compact_triggered {
321 if let Some(summary) = self.compact(history).await {
322 let _ = self.dispatch_tool("__journal__", &summary, "").await;
324 *history = vec![Message::user(&summary)];
326 yield AgentEvent::TextDelta(
327 "\n[context compacted]\n".to_owned(),
328 );
329 }
330 continue;
332 }
333
334 let step = AgentStep {
335 response,
336 tool_calls,
337 tool_results,
338 };
339
340 if !has_tool_calls {
341 let stop_reason = Self::stop_reason(&step);
342 steps.push(step);
343 yield AgentEvent::Done(AgentResponse {
344 final_response: content,
345 iterations: steps.len(),
346 stop_reason,
347 steps,
348 });
349 return;
350 }
351
352 steps.push(step);
353 }
354
355 let final_response = steps.last().and_then(|s| s.response.content().cloned());
356 yield AgentEvent::Done(AgentResponse {
357 final_response,
358 iterations: steps.len(),
359 stop_reason: AgentStopReason::MaxIterations,
360 steps,
361 });
362 }
363 }
364}