1use std::time::Instant;
4
5use crate::events::AgentEvent;
6use crate::types::{Message, StopReason, ToolDefinition};
7
8use super::context::{build_effective_prompt, resolve_context, ContextLoadResult, PathVariables};
9use super::helpers::extract_text_response;
10use super::types::{AgentError, AgentResponse, TokenUsageStats, ToolCallInfo};
11use super::Agent;
12
13#[cfg(feature = "session")]
14use crate::session::{MessageRole, Session, SessionMessage, ToolCall, ToolResult};
15
16#[cfg(feature = "session")]
17use super::session::convert_session_message_to_mixtape;
18
19impl Agent {
20 pub async fn run(&self, user_message: &str) -> Result<AgentResponse, AgentError> {
42 let run_start = Instant::now();
43
44 let mut tool_call_infos: Vec<ToolCallInfo> = Vec::new();
46 let mut total_input_tokens: usize = 0;
47 let mut total_output_tokens: usize = 0;
48 let mut model_call_count: usize = 0;
49
50 let context_result = self.resolve_context_files()?;
52
53 *self.last_context_result.write() = Some(context_result.clone());
55
56 let effective_system_prompt =
58 build_effective_prompt(self.system_prompt.as_deref(), &context_result);
59
60 self.emit_event(AgentEvent::RunStarted {
62 input: user_message.to_string(),
63 timestamp: run_start,
64 });
65
66 #[cfg(feature = "session")]
68 let mut session: Option<Session> = if let Some(store) = &self.session_store {
69 let sess = store.get_or_create_session().await?;
70
71 if !sess.messages.is_empty() {
73 let mut messages: Vec<Message> = vec![];
74 for msg in &sess.messages {
75 messages.extend(convert_session_message_to_mixtape(msg)?);
76 }
77 self.conversation_manager.write().hydrate(messages);
78
79 self.emit_event(AgentEvent::SessionResumed {
80 session_id: sess.id.clone(),
81 message_count: sess.messages.len(),
82 created_at: sess.created_at,
83 });
84 }
85
86 Some(sess)
87 } else {
88 None
89 };
90
91 #[cfg(feature = "session")]
92 let mut session_tool_calls: Vec<ToolCall> = Vec::new();
93 #[cfg(feature = "session")]
94 let mut session_tool_results: Vec<ToolResult> = Vec::new();
95
96 self.conversation_manager
98 .write()
99 .add_message(Message::user(user_message));
100
101 loop {
102 let tool_defs: Vec<ToolDefinition> = self
104 .tools
105 .iter()
106 .map(|t| ToolDefinition {
107 name: t.name().to_string(),
108 description: t.description().to_string(),
109 input_schema: t.input_schema(),
110 })
111 .collect();
112
113 let limits =
115 crate::conversation::ContextLimits::new(self.provider.max_context_tokens());
116 let provider = &self.provider;
117 let estimate_tokens = |msgs: &[Message]| provider.estimate_message_tokens(msgs);
118 let context_messages = self
119 .conversation_manager
120 .read()
121 .messages_for_context(limits, &estimate_tokens);
122
123 let model_call_start = Instant::now();
125 self.emit_event(AgentEvent::ModelCallStarted {
126 message_count: context_messages.len(),
127 tool_count: tool_defs.len(),
128 timestamp: model_call_start,
129 });
130
131 let response = self
133 .generate_with_streaming(
134 context_messages,
135 tool_defs,
136 effective_system_prompt.clone(),
137 )
138 .await?;
139
140 model_call_count += 1;
142 if let Some(ref usage) = response.usage {
143 total_input_tokens += usage.input_tokens;
144 total_output_tokens += usage.output_tokens;
145 }
146
147 let response_text = response.message.text();
149
150 self.emit_event(AgentEvent::ModelCallCompleted {
151 response_content: response_text,
152 tokens: response.usage,
153 duration: model_call_start.elapsed(),
154 stop_reason: Some(response.stop_reason),
155 });
156
157 self.conversation_manager
159 .write()
160 .add_message(response.message.clone());
161
162 match response.stop_reason {
163 StopReason::ToolUse => {
164 let tool_results = self
165 .process_tool_calls(
166 &response.message,
167 &mut tool_call_infos,
168 #[cfg(feature = "session")]
169 &mut session_tool_calls,
170 #[cfg(feature = "session")]
171 &mut session_tool_results,
172 )
173 .await;
174
175 self.conversation_manager
177 .write()
178 .add_message(Message::tool_results(tool_results));
179 }
180 StopReason::EndTurn => {
181 return self
182 .finalize_run(
183 &response.message,
184 user_message,
185 tool_call_infos,
186 total_input_tokens,
187 total_output_tokens,
188 model_call_count,
189 run_start,
190 #[cfg(feature = "session")]
191 &mut session,
192 #[cfg(feature = "session")]
193 &session_tool_calls,
194 #[cfg(feature = "session")]
195 &session_tool_results,
196 )
197 .await;
198 }
199 StopReason::MaxTokens => {
200 self.emit_event(AgentEvent::RunFailed {
201 error: AgentError::MaxTokensExceeded.to_string(),
202 duration: run_start.elapsed(),
203 });
204 return Err(AgentError::MaxTokensExceeded);
205 }
206 StopReason::ContentFiltered => {
207 self.emit_event(AgentEvent::RunFailed {
208 error: AgentError::ContentFiltered.to_string(),
209 duration: run_start.elapsed(),
210 });
211 return Err(AgentError::ContentFiltered);
212 }
213 StopReason::StopSequence => {
214 let final_response =
216 extract_text_response(&response.message).unwrap_or_default();
217
218 let duration = run_start.elapsed();
219 self.emit_event(AgentEvent::RunCompleted {
220 output: final_response.clone(),
221 duration,
222 });
223
224 let token_usage = if total_input_tokens > 0 || total_output_tokens > 0 {
225 Some(TokenUsageStats {
226 input_tokens: total_input_tokens,
227 output_tokens: total_output_tokens,
228 })
229 } else {
230 None
231 };
232
233 return Ok(AgentResponse {
234 text: final_response,
235 tool_calls: tool_call_infos,
236 token_usage,
237 duration,
238 model_calls: model_call_count,
239 });
240 }
241 StopReason::PauseTurn => {
242 }
245 StopReason::Unknown => {
246 let error = AgentError::UnexpectedStopReason("Unknown".to_string());
247 self.emit_event(AgentEvent::RunFailed {
248 error: error.to_string(),
249 duration: run_start.elapsed(),
250 });
251 return Err(error);
252 }
253 }
254 }
255 }
256
257 #[allow(clippy::too_many_arguments)]
259 #[allow(unused_variables)] async fn finalize_run(
261 &self,
262 message: &Message,
263 user_message: &str,
264 tool_call_infos: Vec<ToolCallInfo>,
265 total_input_tokens: usize,
266 total_output_tokens: usize,
267 model_call_count: usize,
268 run_start: Instant,
269 #[cfg(feature = "session")] session: &mut Option<Session>,
270 #[cfg(feature = "session")] session_tool_calls: &[ToolCall],
271 #[cfg(feature = "session")] session_tool_results: &[ToolResult],
272 ) -> Result<AgentResponse, AgentError> {
273 let final_response = extract_text_response(message).ok_or(AgentError::NoResponse)?;
274
275 #[cfg(feature = "session")]
277 if let (Some(ref mut sess), Some(ref store)) = (session, &self.session_store) {
278 use chrono::Utc;
279
280 sess.messages.push(SessionMessage {
282 role: MessageRole::User,
283 content: user_message.to_string(),
284 tool_calls: vec![],
285 tool_results: vec![],
286 timestamp: Utc::now(),
287 });
288
289 sess.messages.push(SessionMessage {
291 role: MessageRole::Assistant,
292 content: final_response.clone(),
293 tool_calls: session_tool_calls.to_vec(),
294 tool_results: session_tool_results.to_vec(),
295 timestamp: Utc::now(),
296 });
297
298 store.save_session(sess).await?;
300
301 self.emit_event(AgentEvent::SessionSaved {
303 session_id: sess.id.clone(),
304 message_count: sess.messages.len(),
305 });
306 }
307
308 let duration = run_start.elapsed();
310 self.emit_event(AgentEvent::RunCompleted {
311 output: final_response.clone(),
312 duration,
313 });
314
315 let token_usage = if total_input_tokens > 0 || total_output_tokens > 0 {
317 Some(TokenUsageStats {
318 input_tokens: total_input_tokens,
319 output_tokens: total_output_tokens,
320 })
321 } else {
322 None
323 };
324
325 Ok(AgentResponse {
326 text: final_response,
327 tool_calls: tool_call_infos,
328 token_usage,
329 duration,
330 model_calls: model_call_count,
331 })
332 }
333
334 fn resolve_context_files(&self) -> Result<ContextLoadResult, AgentError> {
336 if self.context_sources.is_empty() {
337 return Ok(ContextLoadResult::default());
338 }
339
340 let vars = PathVariables::current();
341 resolve_context(&self.context_sources, &vars, &self.context_config).map_err(|e| e.into())
342 }
343}