Skip to main content

batuta/agent/
runtime.rs

1//! Agent runtime — the core perceive-reason-act loop.
2//!
3//! Orchestrates perceive (recall) → reason (LLM) → act (tools) → guard (Jidoka).
4//! See: arXiv:2512.10350 (loop dynamics), arXiv:2501.09136 (agentic RAG).
5
6use std::sync::Arc;
7
8use tokio::sync::mpsc;
9use tracing::{debug, info, instrument, warn};
10
11use super::capability::capability_matches;
12use super::driver::{
13    CompletionRequest, CompletionResponse, LlmDriver, Message, StreamEvent, ToolCall, ToolResultMsg,
14};
15use super::guard::{LoopGuard, LoopVerdict};
16use super::manifest::AgentManifest;
17use super::memory::{MemorySource, MemorySubstrate};
18use super::phase::LoopPhase;
19use super::result::{AgentError, AgentLoopResult, StopReason};
20use super::runtime_helpers::{call_with_retry, emit, truncate_messages};
21use super::tool::ToolRegistry;
22use crate::serve::context::{
23    ContextConfig, ContextManager, ContextWindow, TokenEstimator, TruncationStrategy,
24};
25
26/// Run the agent loop to completion (single-turn, no history).
27#[instrument(skip_all, fields(agent = %manifest.name, query_len = query.len()))]
28#[cfg_attr(
29    feature = "agents-contracts",
30    provable_contracts_macros::contract("agent-loop-v1", equation = "loop_termination")
31)]
32pub async fn run_agent_loop(
33    manifest: &AgentManifest,
34    query: &str,
35    driver: &dyn LlmDriver,
36    tools: &ToolRegistry,
37    memory: &dyn MemorySubstrate,
38    stream_tx: Option<mpsc::Sender<StreamEvent>>,
39) -> Result<AgentLoopResult, AgentError> {
40    let mut history = Vec::new();
41    run_agent_turn(manifest, &mut history, query, driver, tools, memory, stream_tx).await
42}
43
44/// PMAT-177: Agent loop with tool-use nudge. If first turn has no tool calls, retries once.
45pub async fn run_agent_loop_with_nudge(
46    manifest: &AgentManifest,
47    query: &str,
48    driver: &dyn LlmDriver,
49    tools: &ToolRegistry,
50    memory: &dyn MemorySubstrate,
51    stream_tx: Option<mpsc::Sender<StreamEvent>>,
52) -> Result<AgentLoopResult, AgentError> {
53    let mut history = Vec::new();
54    let r = run_agent_turn(manifest, &mut history, query, driver, tools, memory, stream_tx.clone())
55        .await?;
56    if r.tool_calls == 0 && tools.len() > 0 {
57        info!("no tool calls on first turn, nudging");
58        let nudge =
59            "Use a tool to answer. Emit a <tool_call> block with glob, file_read, or shell.";
60        return run_agent_turn(manifest, &mut history, nudge, driver, tools, memory, stream_tx)
61            .await;
62    }
63    Ok(r)
64}
65
66/// Run one agent turn with full conversation history (multi-turn).
67///
68/// Appends the new query and all resulting messages (tool calls, assistant
69/// response) to `history`. On next call, prior turns provide context so
70/// the agent can maintain coherent multi-turn conversation.
71///
72/// This is the core primitive for the REPL — each user prompt is one turn,
73/// and `history` accumulates across the session.
74#[instrument(skip_all, fields(agent = %manifest.name, query_len = query.len(), history_len = history.len()))]
75pub async fn run_agent_turn(
76    manifest: &AgentManifest,
77    history: &mut Vec<Message>,
78    query: &str,
79    driver: &dyn LlmDriver,
80    tools: &ToolRegistry,
81    memory: &dyn MemorySubstrate,
82    stream_tx: Option<mpsc::Sender<StreamEvent>>,
83) -> Result<AgentLoopResult, AgentError> {
84    // ═══ PRIVACY GATE (Poka-Yoke) ═══
85    // Defense-in-depth: block non-local MCP transports under Sovereign tier.
86    #[cfg(feature = "agents-mcp")]
87    validate_mcp_privacy(manifest)?;
88
89    let mut guard = LoopGuard::new(
90        manifest.resources.max_iterations,
91        manifest.resources.max_tool_calls,
92        manifest.resources.max_cost_usd,
93    )
94    .with_token_budget(manifest.resources.max_tokens_budget);
95
96    // ═══ PERCEIVE ═══
97    emit(stream_tx.as_ref(), StreamEvent::PhaseChange { phase: LoopPhase::Perceive }).await;
98
99    let system = build_system_prompt(manifest, query, memory).await;
100    let tool_defs = tools.definitions_for(&manifest.capabilities);
101    info!(
102        tools = tool_defs.len(),
103        capabilities = manifest.capabilities.len(),
104        history_messages = history.len(),
105        "agent turn initialized"
106    );
107    let context = build_context(driver, &system, &tool_defs, manifest);
108
109    // Build messages: prior history + new user query
110    let mut messages = history.clone();
111    messages.push(Message::User(query.to_string()));
112
113    let mut last_tool_sig: Option<String> = None; // PMAT-172: stuck-loop detection
114    let mut repeat_count: u32 = 0;
115
116    loop {
117        check_verdict(guard.check_iteration())?;
118        debug!(
119            iteration = guard.current_iteration(),
120            tool_calls = guard.total_tool_calls(),
121            "loop iteration start"
122        );
123
124        // ═══ REASON ═══
125        emit(stream_tx.as_ref(), StreamEvent::PhaseChange { phase: LoopPhase::Reason }).await;
126
127        let response =
128            reason_step(driver, &messages, &tool_defs, manifest, &system, &context).await?;
129        check_verdict(guard.record_usage(&response.usage))?;
130
131        // INV-005: Estimate cost and enforce budget (Muda)
132        let cost = driver.estimate_cost(&response.usage);
133        check_verdict(guard.record_cost(cost))?;
134
135        match response.stop_reason {
136            StopReason::EndTurn | StopReason::StopSequence => {
137                info!(
138                    iterations = guard.current_iteration(),
139                    tool_calls = guard.total_tool_calls(),
140                    "turn complete"
141                );
142                let new_start = history.len();
143                for msg in &messages[new_start..] {
144                    history.push(msg.clone());
145                }
146                if !response.text.is_empty() {
147                    history.push(Message::Assistant(response.text.clone()));
148                }
149                return finish_loop(&response, &guard, manifest, query, memory, stream_tx.as_ref())
150                    .await;
151            }
152            StopReason::ToolUse => {
153                // PMAT-172: detect stuck loops (4+ identical tool calls → break)
154                let sig = response.tool_calls.first().map(|tc| format!("{}:{}", tc.name, tc.input));
155                if sig == last_tool_sig {
156                    repeat_count += 1;
157                } else {
158                    last_tool_sig = sig;
159                    repeat_count = 1;
160                }
161                if repeat_count >= 4 {
162                    warn!("stuck loop: same tool call repeated {repeat_count} times");
163                    return finish_loop(
164                        &response,
165                        &guard,
166                        manifest,
167                        query,
168                        memory,
169                        stream_tx.as_ref(),
170                    )
171                    .await;
172                }
173                debug!(num_calls = response.tool_calls.len(), "processing tool calls");
174                guard.reset_max_tokens();
175                handle_tool_calls(
176                    &response,
177                    &mut messages,
178                    &mut guard,
179                    manifest,
180                    tools,
181                    stream_tx.as_ref(),
182                )
183                .await?;
184            }
185            StopReason::MaxTokens => {
186                warn!("max tokens reached, continuing loop");
187                check_verdict(guard.record_max_tokens())?;
188                messages.push(Message::Assistant(response.text));
189            }
190        }
191    }
192}
193
194fn check_verdict(verdict: LoopVerdict) -> Result<(), AgentError> {
195    match verdict {
196        LoopVerdict::CircuitBreak(msg) | LoopVerdict::Block(msg) => {
197            Err(AgentError::CircuitBreak(msg))
198        }
199        LoopVerdict::Allow | LoopVerdict::Warn(_) => Ok(()),
200    }
201}
202
203async fn reason_step(
204    driver: &dyn LlmDriver,
205    messages: &[Message],
206    tool_defs: &[super::driver::ToolDefinition],
207    manifest: &AgentManifest,
208    system: &str,
209    context: &ContextManager,
210) -> Result<CompletionResponse, AgentError> {
211    let truncated_messages = truncate_messages(messages, context)?;
212
213    let request = CompletionRequest {
214        model: String::new(),
215        messages: truncated_messages,
216        tools: tool_defs.to_vec(),
217        max_tokens: manifest.model.max_tokens,
218        temperature: manifest.model.temperature,
219        system: Some(system.to_string()),
220    };
221
222    call_with_retry(driver, &request).await
223}
224
225async fn finish_loop(
226    response: &CompletionResponse,
227    guard: &LoopGuard,
228    manifest: &AgentManifest,
229    query: &str,
230    memory: &dyn MemorySubstrate,
231    stream_tx: Option<&mpsc::Sender<StreamEvent>>,
232) -> Result<AgentLoopResult, AgentError> {
233    let _ = memory
234        .remember(
235            &manifest.name,
236            &format!("Q: {query}\nA: {}", response.text),
237            MemorySource::Conversation,
238            None,
239        )
240        .await;
241
242    emit(stream_tx, StreamEvent::PhaseChange { phase: LoopPhase::Done }).await;
243
244    Ok(AgentLoopResult {
245        text: response.text.clone(),
246        usage: guard.usage().clone(),
247        iterations: guard.current_iteration(),
248        tool_calls: guard.total_tool_calls(),
249    })
250}
251
252async fn build_system_prompt(
253    manifest: &AgentManifest,
254    query: &str,
255    memory: &dyn MemorySubstrate,
256) -> String {
257    let memories = memory.recall(query, 5, None, None).await.unwrap_or_default();
258
259    let mut system = manifest.model.system_prompt.clone();
260    if !memories.is_empty() {
261        use std::fmt::Write;
262        system.push_str("\n\n## Recalled Context\n");
263        for m in &memories {
264            let _ = writeln!(system, "- {}", m.content);
265        }
266    }
267    system
268}
269
270fn build_context(
271    driver: &dyn LlmDriver,
272    system: &str,
273    tool_defs: &[super::driver::ToolDefinition],
274    manifest: &AgentManifest,
275) -> ContextManager {
276    let estimator = TokenEstimator::new();
277    let system_tokens = estimator.estimate(system);
278    let tool_json = serde_json::to_string(tool_defs).unwrap_or_default();
279    let tool_tokens = estimator.estimate(&tool_json);
280    let context_window = driver.context_window();
281    let effective_window = context_window.saturating_sub(system_tokens).saturating_sub(tool_tokens);
282    ContextManager::new(ContextConfig {
283        window: ContextWindow::new(effective_window, manifest.model.max_tokens as usize),
284        strategy: TruncationStrategy::SlidingWindow,
285        preserve_system: false,
286        min_messages: 2,
287    })
288}
289
290/// Process tool calls from a completion response.
291#[instrument(skip_all, fields(num_calls = response.tool_calls.len()))]
292async fn handle_tool_calls(
293    response: &CompletionResponse,
294    messages: &mut Vec<Message>,
295    guard: &mut LoopGuard,
296    manifest: &AgentManifest,
297    tools: &ToolRegistry,
298    stream_tx: Option<&mpsc::Sender<StreamEvent>>,
299) -> Result<(), AgentError> {
300    for call in &response.tool_calls {
301        let Some(tool) = tools.get(&call.name) else {
302            push_tool_error(messages, call, &format!("unknown tool: {}", call.name));
303            continue;
304        };
305
306        // Poka-Yoke: capability check
307        let cap = tool.required_capability();
308        if !capability_matches(&manifest.capabilities, &cap) {
309            push_tool_error(messages, call, &format!("capability denied for tool '{}'", call.name));
310            continue;
311        }
312
313        // Poka-Yoke: sovereign privacy blocks network egress
314        if manifest.privacy == crate::serve::backends::PrivacyTier::Sovereign
315            && matches!(cap, super::capability::Capability::Network { .. })
316        {
317            push_tool_error(messages, call, "sovereign privacy blocks network egress");
318            continue;
319        }
320
321        // Jidoka: loop guard check
322        match guard.check_tool_call(&call.name, &call.input) {
323            LoopVerdict::Allow | LoopVerdict::Warn(_) => {}
324            LoopVerdict::Block(msg) => {
325                push_tool_error(messages, call, &msg);
326                continue;
327            }
328            LoopVerdict::CircuitBreak(msg) => {
329                return Err(AgentError::CircuitBreak(msg));
330            }
331        }
332
333        // ═══ ACT ═══
334        let result = execute_tool(call, tool, stream_tx).await;
335
336        messages.push(Message::AssistantToolUse(ToolCall {
337            id: call.id.clone(),
338            name: call.name.clone(),
339            input: call.input.clone(),
340        }));
341        messages.push(Message::ToolResult(ToolResultMsg {
342            tool_use_id: call.id.clone(),
343            content: result.content,
344            is_error: result.is_error,
345        }));
346    }
347    Ok(())
348}
349
350/// Execute a single tool call with tracing, timeout, and sanitization.
351async fn execute_tool(
352    call: &ToolCall,
353    tool: &dyn super::tool::Tool,
354    stream_tx: Option<&mpsc::Sender<StreamEvent>>,
355) -> super::tool::ToolResult {
356    let tool_span = tracing::info_span!(
357        "tool_execute",
358        tool = %call.name,
359        id = %call.id,
360    );
361    let _enter = tool_span.enter();
362
363    emit(
364        stream_tx,
365        StreamEvent::PhaseChange { phase: LoopPhase::Act { tool_name: call.name.clone() } },
366    )
367    .await;
368
369    emit(stream_tx, StreamEvent::ToolUseStart { id: call.id.clone(), name: call.name.clone() })
370        .await;
371
372    let result = tokio::time::timeout(tool.timeout(), tool.execute(call.input.clone()))
373        .await
374        .unwrap_or_else(|elapsed| {
375            warn!(tool = %call.name, timeout = ?elapsed, "tool execution timed out");
376            super::tool::ToolResult::error(format!(
377                "tool '{}' timed out after {:?}",
378                call.name, elapsed
379            ))
380        })
381        .sanitized(); // Poka-Yoke: strip injection patterns from tool output
382
383    debug!(
384        tool = %call.name,
385        is_error = result.is_error,
386        output_len = result.content.len(),
387        "tool execution complete"
388    );
389
390    emit(
391        stream_tx,
392        StreamEvent::ToolUseEnd {
393            id: call.id.clone(),
394            name: call.name.clone(),
395            result: result.content.clone(),
396        },
397    )
398    .await;
399
400    result
401}
402
403fn push_tool_error(messages: &mut Vec<Message>, call: &ToolCall, error: &str) {
404    messages.push(Message::AssistantToolUse(ToolCall {
405        id: call.id.clone(),
406        name: call.name.clone(),
407        input: call.input.clone(),
408    }));
409    messages.push(Message::ToolResult(ToolResultMsg {
410        tool_use_id: call.id.clone(),
411        content: error.to_string(),
412        is_error: true,
413    }));
414}
415
416// truncate_messages, call_with_retry, emit, validate_mcp_privacy
417// extracted to runtime_helpers.rs (PMAT-190: keep under 500-line threshold)
418#[cfg(feature = "agents-mcp")]
419use super::runtime_helpers::validate_mcp_privacy;
420#[cfg(test)]
421#[path = "runtime_tests.rs"]
422mod tests;
423#[cfg(test)]
424#[path = "runtime_tests_advanced.rs"]
425mod tests_advanced;
426#[cfg(test)]
427#[path = "runtime_tests_guards.rs"]
428mod tests_guards;
429#[cfg(test)]
430#[path = "runtime_tests_multi_turn.rs"]
431mod tests_multi_turn;