1use std::sync::Arc;
7
8use tokio::sync::mpsc;
9use tracing::{debug, info, instrument, warn};
10
11use super::capability::capability_matches;
12use super::driver::{
13 CompletionRequest, CompletionResponse, LlmDriver, Message, StreamEvent, ToolCall, ToolResultMsg,
14};
15use super::guard::{LoopGuard, LoopVerdict};
16use super::manifest::AgentManifest;
17use super::memory::{MemorySource, MemorySubstrate};
18use super::phase::LoopPhase;
19use super::result::{AgentError, AgentLoopResult, StopReason};
20use super::runtime_helpers::{call_with_retry, emit, truncate_messages};
21use super::tool::ToolRegistry;
22use crate::serve::context::{
23 ContextConfig, ContextManager, ContextWindow, TokenEstimator, TruncationStrategy,
24};
25
26#[instrument(skip_all, fields(agent = %manifest.name, query_len = query.len()))]
28#[cfg_attr(
29 feature = "agents-contracts",
30 provable_contracts_macros::contract("agent-loop-v1", equation = "loop_termination")
31)]
32pub async fn run_agent_loop(
33 manifest: &AgentManifest,
34 query: &str,
35 driver: &dyn LlmDriver,
36 tools: &ToolRegistry,
37 memory: &dyn MemorySubstrate,
38 stream_tx: Option<mpsc::Sender<StreamEvent>>,
39) -> Result<AgentLoopResult, AgentError> {
40 let mut history = Vec::new();
41 run_agent_turn(manifest, &mut history, query, driver, tools, memory, stream_tx).await
42}
43
44pub async fn run_agent_loop_with_nudge(
46 manifest: &AgentManifest,
47 query: &str,
48 driver: &dyn LlmDriver,
49 tools: &ToolRegistry,
50 memory: &dyn MemorySubstrate,
51 stream_tx: Option<mpsc::Sender<StreamEvent>>,
52) -> Result<AgentLoopResult, AgentError> {
53 let mut history = Vec::new();
54 let r = run_agent_turn(manifest, &mut history, query, driver, tools, memory, stream_tx.clone())
55 .await?;
56 if r.tool_calls == 0 && tools.len() > 0 {
57 info!("no tool calls on first turn, nudging");
58 let nudge =
59 "Use a tool to answer. Emit a <tool_call> block with glob, file_read, or shell.";
60 return run_agent_turn(manifest, &mut history, nudge, driver, tools, memory, stream_tx)
61 .await;
62 }
63 Ok(r)
64}
65
66#[instrument(skip_all, fields(agent = %manifest.name, query_len = query.len(), history_len = history.len()))]
75pub async fn run_agent_turn(
76 manifest: &AgentManifest,
77 history: &mut Vec<Message>,
78 query: &str,
79 driver: &dyn LlmDriver,
80 tools: &ToolRegistry,
81 memory: &dyn MemorySubstrate,
82 stream_tx: Option<mpsc::Sender<StreamEvent>>,
83) -> Result<AgentLoopResult, AgentError> {
84 #[cfg(feature = "agents-mcp")]
87 validate_mcp_privacy(manifest)?;
88
89 let mut guard = LoopGuard::new(
90 manifest.resources.max_iterations,
91 manifest.resources.max_tool_calls,
92 manifest.resources.max_cost_usd,
93 )
94 .with_token_budget(manifest.resources.max_tokens_budget);
95
96 emit(stream_tx.as_ref(), StreamEvent::PhaseChange { phase: LoopPhase::Perceive }).await;
98
99 let system = build_system_prompt(manifest, query, memory).await;
100 let tool_defs = tools.definitions_for(&manifest.capabilities);
101 info!(
102 tools = tool_defs.len(),
103 capabilities = manifest.capabilities.len(),
104 history_messages = history.len(),
105 "agent turn initialized"
106 );
107 let context = build_context(driver, &system, &tool_defs, manifest);
108
109 let mut messages = history.clone();
111 messages.push(Message::User(query.to_string()));
112
113 let mut last_tool_sig: Option<String> = None; let mut repeat_count: u32 = 0;
115
116 loop {
117 check_verdict(guard.check_iteration())?;
118 debug!(
119 iteration = guard.current_iteration(),
120 tool_calls = guard.total_tool_calls(),
121 "loop iteration start"
122 );
123
124 emit(stream_tx.as_ref(), StreamEvent::PhaseChange { phase: LoopPhase::Reason }).await;
126
127 let response =
128 reason_step(driver, &messages, &tool_defs, manifest, &system, &context).await?;
129 check_verdict(guard.record_usage(&response.usage))?;
130
131 let cost = driver.estimate_cost(&response.usage);
133 check_verdict(guard.record_cost(cost))?;
134
135 match response.stop_reason {
136 StopReason::EndTurn | StopReason::StopSequence => {
137 info!(
138 iterations = guard.current_iteration(),
139 tool_calls = guard.total_tool_calls(),
140 "turn complete"
141 );
142 let new_start = history.len();
143 for msg in &messages[new_start..] {
144 history.push(msg.clone());
145 }
146 if !response.text.is_empty() {
147 history.push(Message::Assistant(response.text.clone()));
148 }
149 return finish_loop(&response, &guard, manifest, query, memory, stream_tx.as_ref())
150 .await;
151 }
152 StopReason::ToolUse => {
153 let sig = response.tool_calls.first().map(|tc| format!("{}:{}", tc.name, tc.input));
155 if sig == last_tool_sig {
156 repeat_count += 1;
157 } else {
158 last_tool_sig = sig;
159 repeat_count = 1;
160 }
161 if repeat_count >= 4 {
162 warn!("stuck loop: same tool call repeated {repeat_count} times");
163 return finish_loop(
164 &response,
165 &guard,
166 manifest,
167 query,
168 memory,
169 stream_tx.as_ref(),
170 )
171 .await;
172 }
173 debug!(num_calls = response.tool_calls.len(), "processing tool calls");
174 guard.reset_max_tokens();
175 handle_tool_calls(
176 &response,
177 &mut messages,
178 &mut guard,
179 manifest,
180 tools,
181 stream_tx.as_ref(),
182 )
183 .await?;
184 }
185 StopReason::MaxTokens => {
186 warn!("max tokens reached, continuing loop");
187 check_verdict(guard.record_max_tokens())?;
188 messages.push(Message::Assistant(response.text));
189 }
190 }
191 }
192}
193
194fn check_verdict(verdict: LoopVerdict) -> Result<(), AgentError> {
195 match verdict {
196 LoopVerdict::CircuitBreak(msg) | LoopVerdict::Block(msg) => {
197 Err(AgentError::CircuitBreak(msg))
198 }
199 LoopVerdict::Allow | LoopVerdict::Warn(_) => Ok(()),
200 }
201}
202
203async fn reason_step(
204 driver: &dyn LlmDriver,
205 messages: &[Message],
206 tool_defs: &[super::driver::ToolDefinition],
207 manifest: &AgentManifest,
208 system: &str,
209 context: &ContextManager,
210) -> Result<CompletionResponse, AgentError> {
211 let truncated_messages = truncate_messages(messages, context)?;
212
213 let request = CompletionRequest {
214 model: String::new(),
215 messages: truncated_messages,
216 tools: tool_defs.to_vec(),
217 max_tokens: manifest.model.max_tokens,
218 temperature: manifest.model.temperature,
219 system: Some(system.to_string()),
220 };
221
222 call_with_retry(driver, &request).await
223}
224
225async fn finish_loop(
226 response: &CompletionResponse,
227 guard: &LoopGuard,
228 manifest: &AgentManifest,
229 query: &str,
230 memory: &dyn MemorySubstrate,
231 stream_tx: Option<&mpsc::Sender<StreamEvent>>,
232) -> Result<AgentLoopResult, AgentError> {
233 let _ = memory
234 .remember(
235 &manifest.name,
236 &format!("Q: {query}\nA: {}", response.text),
237 MemorySource::Conversation,
238 None,
239 )
240 .await;
241
242 emit(stream_tx, StreamEvent::PhaseChange { phase: LoopPhase::Done }).await;
243
244 Ok(AgentLoopResult {
245 text: response.text.clone(),
246 usage: guard.usage().clone(),
247 iterations: guard.current_iteration(),
248 tool_calls: guard.total_tool_calls(),
249 })
250}
251
252async fn build_system_prompt(
253 manifest: &AgentManifest,
254 query: &str,
255 memory: &dyn MemorySubstrate,
256) -> String {
257 let memories = memory.recall(query, 5, None, None).await.unwrap_or_default();
258
259 let mut system = manifest.model.system_prompt.clone();
260 if !memories.is_empty() {
261 use std::fmt::Write;
262 system.push_str("\n\n## Recalled Context\n");
263 for m in &memories {
264 let _ = writeln!(system, "- {}", m.content);
265 }
266 }
267 system
268}
269
270fn build_context(
271 driver: &dyn LlmDriver,
272 system: &str,
273 tool_defs: &[super::driver::ToolDefinition],
274 manifest: &AgentManifest,
275) -> ContextManager {
276 let estimator = TokenEstimator::new();
277 let system_tokens = estimator.estimate(system);
278 let tool_json = serde_json::to_string(tool_defs).unwrap_or_default();
279 let tool_tokens = estimator.estimate(&tool_json);
280 let context_window = driver.context_window();
281 let effective_window = context_window.saturating_sub(system_tokens).saturating_sub(tool_tokens);
282 ContextManager::new(ContextConfig {
283 window: ContextWindow::new(effective_window, manifest.model.max_tokens as usize),
284 strategy: TruncationStrategy::SlidingWindow,
285 preserve_system: false,
286 min_messages: 2,
287 })
288}
289
290#[instrument(skip_all, fields(num_calls = response.tool_calls.len()))]
292async fn handle_tool_calls(
293 response: &CompletionResponse,
294 messages: &mut Vec<Message>,
295 guard: &mut LoopGuard,
296 manifest: &AgentManifest,
297 tools: &ToolRegistry,
298 stream_tx: Option<&mpsc::Sender<StreamEvent>>,
299) -> Result<(), AgentError> {
300 for call in &response.tool_calls {
301 let Some(tool) = tools.get(&call.name) else {
302 push_tool_error(messages, call, &format!("unknown tool: {}", call.name));
303 continue;
304 };
305
306 let cap = tool.required_capability();
308 if !capability_matches(&manifest.capabilities, &cap) {
309 push_tool_error(messages, call, &format!("capability denied for tool '{}'", call.name));
310 continue;
311 }
312
313 if manifest.privacy == crate::serve::backends::PrivacyTier::Sovereign
315 && matches!(cap, super::capability::Capability::Network { .. })
316 {
317 push_tool_error(messages, call, "sovereign privacy blocks network egress");
318 continue;
319 }
320
321 match guard.check_tool_call(&call.name, &call.input) {
323 LoopVerdict::Allow | LoopVerdict::Warn(_) => {}
324 LoopVerdict::Block(msg) => {
325 push_tool_error(messages, call, &msg);
326 continue;
327 }
328 LoopVerdict::CircuitBreak(msg) => {
329 return Err(AgentError::CircuitBreak(msg));
330 }
331 }
332
333 let result = execute_tool(call, tool, stream_tx).await;
335
336 messages.push(Message::AssistantToolUse(ToolCall {
337 id: call.id.clone(),
338 name: call.name.clone(),
339 input: call.input.clone(),
340 }));
341 messages.push(Message::ToolResult(ToolResultMsg {
342 tool_use_id: call.id.clone(),
343 content: result.content,
344 is_error: result.is_error,
345 }));
346 }
347 Ok(())
348}
349
350async fn execute_tool(
352 call: &ToolCall,
353 tool: &dyn super::tool::Tool,
354 stream_tx: Option<&mpsc::Sender<StreamEvent>>,
355) -> super::tool::ToolResult {
356 let tool_span = tracing::info_span!(
357 "tool_execute",
358 tool = %call.name,
359 id = %call.id,
360 );
361 let _enter = tool_span.enter();
362
363 emit(
364 stream_tx,
365 StreamEvent::PhaseChange { phase: LoopPhase::Act { tool_name: call.name.clone() } },
366 )
367 .await;
368
369 emit(stream_tx, StreamEvent::ToolUseStart { id: call.id.clone(), name: call.name.clone() })
370 .await;
371
372 let result = tokio::time::timeout(tool.timeout(), tool.execute(call.input.clone()))
373 .await
374 .unwrap_or_else(|elapsed| {
375 warn!(tool = %call.name, timeout = ?elapsed, "tool execution timed out");
376 super::tool::ToolResult::error(format!(
377 "tool '{}' timed out after {:?}",
378 call.name, elapsed
379 ))
380 })
381 .sanitized(); debug!(
384 tool = %call.name,
385 is_error = result.is_error,
386 output_len = result.content.len(),
387 "tool execution complete"
388 );
389
390 emit(
391 stream_tx,
392 StreamEvent::ToolUseEnd {
393 id: call.id.clone(),
394 name: call.name.clone(),
395 result: result.content.clone(),
396 },
397 )
398 .await;
399
400 result
401}
402
403fn push_tool_error(messages: &mut Vec<Message>, call: &ToolCall, error: &str) {
404 messages.push(Message::AssistantToolUse(ToolCall {
405 id: call.id.clone(),
406 name: call.name.clone(),
407 input: call.input.clone(),
408 }));
409 messages.push(Message::ToolResult(ToolResultMsg {
410 tool_use_id: call.id.clone(),
411 content: error.to_string(),
412 is_error: true,
413 }));
414}
415
416#[cfg(feature = "agents-mcp")]
419use super::runtime_helpers::validate_mcp_privacy;
420#[cfg(test)]
421#[path = "runtime_tests.rs"]
422mod tests;
423#[cfg(test)]
424#[path = "runtime_tests_advanced.rs"]
425mod tests_advanced;
426#[cfg(test)]
427#[path = "runtime_tests_guards.rs"]
428mod tests_guards;
429#[cfg(test)]
430#[path = "runtime_tests_multi_turn.rs"]
431mod tests_multi_turn;