1use crate::model::{Message, Model, Request, Tool};
10use anyhow::Result;
11use async_stream::stream;
12use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
13use futures_core::Stream;
14use tokio::sync::{mpsc, oneshot};
15use tool::{ToolRequest, ToolSender};
16
17pub use builder::AgentBuilder;
18pub use config::AgentConfig;
19pub use parser::parse_agent_md;
20
21mod builder;
22pub mod config;
23pub mod event;
24mod parser;
25pub mod tool;
26
27pub struct Agent<M: Model> {
34 pub config: AgentConfig,
36 model: M,
38 pub(crate) history: Vec<Message>,
40 tools: Vec<Tool>,
42 tool_tx: Option<ToolSender>,
44}
45
46impl<M: Model> Agent<M> {
47 pub fn push_message(&mut self, message: Message) {
49 self.history.push(message);
50 }
51
52 pub fn messages(&self) -> &[Message] {
54 &self.history
55 }
56
57 pub fn clear_history(&mut self) {
59 self.history.clear();
60 }
61
62 pub async fn step(&mut self) -> Result<AgentStep> {
68 let model_name = self
69 .config
70 .model
71 .clone()
72 .unwrap_or_else(|| self.model.active_model());
73
74 let mut messages = Vec::with_capacity(1 + self.history.len());
75 if !self.config.system_prompt.is_empty() {
76 messages.push(Message::system(&self.config.system_prompt));
77 }
78 messages.extend(self.history.iter().cloned());
79
80 let mut request = Request::new(model_name)
81 .with_messages(messages)
82 .with_tool_choice(self.config.tool_choice.clone());
83 if !self.tools.is_empty() {
84 request = request.with_tools(self.tools.clone());
85 }
86
87 let response = self.model.send(&request).await?;
88 let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
89
90 if let Some(msg) = response.message() {
91 self.history.push(msg);
92 }
93
94 let mut tool_results = Vec::new();
95 if !tool_calls.is_empty() {
96 for tc in &tool_calls {
97 let result = self
98 .dispatch_tool(&tc.function.name, &tc.function.arguments)
99 .await;
100 let msg = Message::tool(&result, tc.id.clone());
101 self.history.push(msg.clone());
102 tool_results.push(msg);
103 }
104 }
105
106 Ok(AgentStep {
107 response,
108 tool_calls,
109 tool_results,
110 })
111 }
112
113 async fn dispatch_tool(&self, name: &str, args: &str) -> String {
118 let Some(tx) = &self.tool_tx else {
119 return format!("tool '{name}' called but no tool sender configured");
120 };
121 let (reply_tx, reply_rx) = oneshot::channel();
122 let req = ToolRequest {
123 name: name.to_owned(),
124 args: args.to_owned(),
125 reply: reply_tx,
126 };
127 if tx.send(req).is_err() {
128 return format!("tool channel closed while calling '{name}'");
129 }
130 reply_rx
131 .await
132 .unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
133 }
134
135 fn stop_reason(step: &AgentStep) -> AgentStopReason {
137 if step.response.content().is_some() {
138 AgentStopReason::TextResponse
139 } else {
140 AgentStopReason::NoAction
141 }
142 }
143
144 pub async fn run(&mut self, events: mpsc::UnboundedSender<AgentEvent>) -> AgentResponse {
149 use futures_util::StreamExt;
150
151 let mut stream = std::pin::pin!(self.run_stream());
152 let mut response = None;
153 while let Some(event) = stream.next().await {
154 if let AgentEvent::Done(ref resp) = event {
155 response = Some(resp.clone());
156 }
157 let _ = events.send(event);
158 }
159
160 response.unwrap_or_else(|| AgentResponse {
161 final_response: None,
162 iterations: 0,
163 stop_reason: AgentStopReason::Error("stream ended without Done".into()),
164 steps: vec![],
165 })
166 }
167
168 pub fn run_stream(&mut self) -> impl Stream<Item = AgentEvent> + '_ {
174 stream! {
175 let mut steps = Vec::new();
176 let max = self.config.max_iterations;
177
178 for _ in 0..max {
179 match self.step().await {
180 Ok(step) => {
181 let has_tool_calls = !step.tool_calls.is_empty();
182 let text = step.response.content().cloned();
183
184 if let Some(ref t) = text {
185 yield AgentEvent::TextDelta(t.clone());
186 }
187
188 if has_tool_calls {
189 yield AgentEvent::ToolCallsStart(step.tool_calls.clone());
190 for (tc, result) in step.tool_calls.iter().zip(&step.tool_results) {
191 yield AgentEvent::ToolResult {
192 call_id: tc.id.clone(),
193 output: result.content.clone(),
194 };
195 }
196 yield AgentEvent::ToolCallsComplete;
197 }
198
199 if !has_tool_calls {
200 let stop_reason = Self::stop_reason(&step);
201 steps.push(step);
202 yield AgentEvent::Done(AgentResponse {
203 final_response: text,
204 iterations: steps.len(),
205 stop_reason,
206 steps,
207 });
208 return;
209 }
210
211 steps.push(step);
212 }
213 Err(e) => {
214 yield AgentEvent::Done(AgentResponse {
215 final_response: None,
216 iterations: steps.len(),
217 stop_reason: AgentStopReason::Error(e.to_string()),
218 steps,
219 });
220 return;
221 }
222 }
223 }
224
225 let final_response = steps.last().and_then(|s| s.response.content().cloned());
226 yield AgentEvent::Done(AgentResponse {
227 final_response,
228 iterations: steps.len(),
229 stop_reason: AgentStopReason::MaxIterations,
230 steps,
231 });
232 }
233 }
234}