Skip to main content

heartbit_core/agent/
runner.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use serde::{Deserialize, Serialize};
6use tracing::{Instrument, debug, info_span};
7
8use crate::error::Error;
9use crate::llm::LlmProvider;
10use crate::llm::types::{
11    CompletionRequest, ContentBlock, Message, StopReason, TokenUsage, ToolCall, ToolDefinition,
12    ToolResult,
13};
14use crate::memory::Memory;
15use crate::tool::{Tool, ToolOutput, validate_tool_input};
16use crate::util::levenshtein;
17
18use super::audit::{AuditRecord, AuditTrail};
19use super::builder::AgentRunnerBuilder;
20use super::cache;
21use super::context::{AgentContext, ContextStrategy};
22use super::doom_loop::DoomLoopTracker;
23use super::events::{AgentEvent, EVENT_MAX_PAYLOAD_BYTES, OnEvent, truncate_for_event};
24use super::guardrail::{GuardAction, Guardrail};
25use super::observability;
26use super::permission;
27use super::pruner;
28use super::tool_filter;
29
30/// Callback for interactive mode. Called when the agent needs more user input
31/// (i.e., the LLM returned text without tool calls). Returns `Some(message)`
32/// to continue the conversation, or `None` to end the session.
33pub type OnInput = dyn Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<String>> + Send>>
34    + Send
35    + Sync;
36
37/// Behavioral guidelines appended to every agent's system prompt.
38/// Ensures agents proactively discover capabilities and exhaust options
39/// before claiming they cannot do something.
40pub(crate) const RESOURCEFULNESS_GUIDELINES: &str = "\n\n\
41## Resourcefulness\n\
42Before claiming you cannot do something or lack access to a tool:\n\
43- Use bash to check for installed CLIs (`which <tool>`, `command -v <tool>`).\n\
44- Search for files, configs, and resources before saying they don't exist.\n\
45- Read documentation, help output (`<tool> --help`), and man pages when unsure.\n\
46- Try alternative approaches when the first attempt fails.\n\
47Never say \"I don't have access\" or \"I can't\" without evidence. Investigate first.";
48
49/// Output of a completed agent run.
50///
51/// Returned by [`AgentRunner::execute`] on success. Contains the agent's
52/// final text response and usage accounting for the entire run.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct AgentOutput {
55    /// The agent's final text response.
56    pub result: String,
57    /// Total number of tool calls made during the run.
58    pub tool_calls_made: usize,
59    /// Aggregate token usage for the entire run.
60    pub tokens_used: TokenUsage,
61    /// Structured output when the agent was configured with a response schema.
62    /// Contains the validated JSON conforming to the schema.
63    pub structured: Option<serde_json::Value>,
64    /// Estimated cost in USD based on model pricing. `None` if the model is
65    /// unknown or cost estimation is not available.
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub estimated_cost_usd: Option<f64>,
68    /// The model name used for this run. For cascading providers, this is the
69    /// last model that produced a response.
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub model_name: Option<String>,
72}
73
74impl AgentOutput {
75    /// Accumulate this output's usage, tool calls, and cost into running totals.
76    pub(crate) fn accumulate_into(
77        &self,
78        total_usage: &mut TokenUsage,
79        total_tool_calls: &mut usize,
80        total_cost: &mut Option<f64>,
81    ) {
82        *total_usage += self.tokens_used;
83        *total_tool_calls += self.tool_calls_made;
84        if let Some(cost) = self.estimated_cost_usd {
85            *total_cost.get_or_insert(0.0) += cost;
86        }
87    }
88}
89
90/// Runs an agent loop: LLM call → tool execution → repeat until done.
91pub struct AgentRunner<P: LlmProvider> {
92    pub(super) provider: Arc<P>,
93    pub(super) name: String,
94    pub(super) system_prompt: String,
95    pub(super) tools: HashMap<String, Arc<dyn Tool>>,
96    pub(super) tool_defs: Vec<ToolDefinition>,
97    pub(super) max_turns: usize,
98    pub(super) max_tokens: u32,
99    pub(super) context_strategy: ContextStrategy,
100    /// Token threshold at which to trigger summarization. `None` = no summarization.
101    pub(super) summarize_threshold: Option<u32>,
102    /// Optional callback for streaming text output.
103    pub(super) on_text: Option<Arc<crate::llm::OnText>>,
104    /// Optional callback for human-in-the-loop approval before tool execution.
105    pub(super) on_approval: Option<Arc<crate::llm::OnApproval>>,
106    /// Optional timeout for individual tool executions.
107    pub(super) tool_timeout: Option<Duration>,
108    /// Optional maximum byte size for tool output content. Oversized results
109    /// are truncated with a `[truncated: N bytes omitted]` suffix.
110    pub(super) max_tool_output_bytes: Option<usize>,
111    /// When set, a synthetic `respond` tool is injected with this JSON Schema.
112    /// The agent calls `respond` to produce structured output conforming to the schema.
113    pub(super) structured_schema: Option<serde_json::Value>,
114    /// Optional callback for structured agent events.
115    pub(super) on_event: Option<Arc<OnEvent>>,
116    /// Guardrails applied to LLM calls and tool executions.
117    pub(super) guardrails: Vec<Arc<dyn Guardrail>>,
118    /// Optional callback for interactive mode. When set and the LLM returns
119    /// text without tool calls, the callback is invoked to get the next user
120    /// message instead of returning immediately.
121    pub(super) on_input: Option<Arc<OnInput>>,
122    /// Optional wall-clock deadline for the entire run. When set, the full
123    /// `execute` call (all turns) is wrapped in `tokio::time::timeout`.
124    pub(super) run_timeout: Option<Duration>,
125    /// Optional reasoning/thinking effort level for models that support it.
126    pub(super) reasoning_effort: Option<crate::llm::types::ReasoningEffort>,
127    /// When true, inject a reflection prompt after tool results to encourage
128    /// the agent to assess results before the next action (Reflexion/CRITIC pattern).
129    pub(super) enable_reflection: bool,
130    /// When set, tool outputs exceeding this byte threshold are compressed
131    /// via an LLM call that preserves factual content while removing redundancy.
132    pub(super) tool_output_compression_threshold: Option<usize>,
133    /// When set, limits the number of tool definitions sent per LLM turn.
134    /// Tools are selected based on recent usage and keyword relevance.
135    pub(super) max_tools_per_turn: Option<usize>,
136    /// When set, pre-filters tool definitions based on query classification
137    /// before dynamic selection. Reduces token usage for simple queries.
138    pub(super) tool_profile: Option<tool_filter::ToolProfile>,
139    /// Maximum number of consecutive identical tool-call turns before the
140    /// agent receives an error result instead of executing the tools. `None`
141    /// disables doom loop detection.
142    pub(super) max_identical_tool_calls: Option<u32>,
143    /// Maximum number of consecutive fuzzy-identical tool-call turns before
144    /// doom loop detection triggers. Fuzzy matching compares sorted tool names
145    /// (ignoring inputs). `None` disables fuzzy detection.
146    pub(super) max_fuzzy_identical_tool_calls: Option<u32>,
147    /// Hard cap on the number of tool invocations per LLM turn. When the LLM
148    /// emits more tool_use blocks than this limit, the run fails with
149    /// `Error::Agent` (wrapped in `Error::WithPartialUsage`). `None` = unlimited.
150    pub(super) max_tool_calls_per_turn: Option<u32>,
151    /// Declarative permission rules evaluated per tool call before the
152    /// `on_approval` callback. `Allow` → execute, `Deny` → error result,
153    /// `Ask` → fall through to `on_approval`.
154    ///
155    /// Wrapped in `RwLock` for interior mutability: learned rules from
156    /// `AlwaysAllow`/`AlwaysDeny` are injected at runtime via `&self`.
157    /// Lock is never held across `.await`.
158    pub(super) permission_rules: parking_lot::RwLock<permission::PermissionRuleset>,
159    /// Optional learned permissions for persisting AlwaysAllow/AlwaysDeny decisions.
160    pub(super) learned_permissions: Option<Arc<std::sync::Mutex<permission::LearnedPermissions>>>,
161    /// Optional LSP manager for collecting diagnostics after file-modifying tools.
162    pub(super) lsp_manager: Option<Arc<crate::lsp::LspManager>>,
163    /// Optional session pruning config. When set, old tool results are truncated
164    /// before each LLM call to reduce token usage.
165    pub(super) session_prune_config: Option<pruner::SessionPruneConfig>,
166    /// Optional memory store reference for pre-compaction flush.
167    pub(super) memory: Option<Arc<dyn Memory>>,
168    /// When true, use recursive (cluster-then-summarize) summarization for
169    /// long conversations instead of single-shot.
170    pub(super) enable_recursive_summarization: bool,
171    /// When true, run memory consolidation at session end.
172    pub(super) consolidate_on_exit: bool,
173    /// Observability verbosity level controlling span attribute recording.
174    pub(super) observability_mode: observability::ObservabilityMode,
175    /// Hard limit on cumulative tokens (input + output) across all turns.
176    /// When exceeded, the agent returns `Error::BudgetExceeded`.
177    pub(super) max_total_tokens: Option<u64>,
178    /// Controls whether audit records include full content or metadata only.
179    pub(super) audit_mode: super::audit::AuditMode,
180    /// Optional audit trail for recording untruncated agent decisions.
181    pub(super) audit_trail: Option<Arc<dyn AuditTrail>>,
182    /// Optional user context for multi-tenant audit enrichment.
183    pub(super) audit_user_id: Option<String>,
184    pub(super) audit_tenant_id: Option<String>,
185    /// Delegation chain for audit records (e.g., `["heartbit-agent"]` when acting on behalf of user).
186    pub(super) audit_delegation_chain: Vec<String>,
187    /// Optional LRU cache for LLM completion responses. Skips the LLM call
188    /// when an identical request (system prompt + messages + tool names) is found.
189    pub(super) response_cache: Option<cache::ResponseCache>,
190    /// Optional per-tenant in-flight token tracker. When set, `adjust()` is called
191    /// after each LLM response to reconcile actual vs. estimated usage.
192    pub(super) tenant_tracker: Option<Arc<crate::agent::tenant_tracker::TenantTokenTracker>>,
193    /// Cumulative actual tokens (input + output) across all turns for this runner.
194    /// Used to compute signed deltas for `tenant_tracker.adjust()` and to release
195    /// the full amount on `Drop`.
196    pub(super) cumulative_actual_tokens: std::sync::atomic::AtomicUsize,
197}
198
199impl<P: LlmProvider> AgentRunner<P> {
200    /// Create a new [`AgentRunnerBuilder`] for an agent backed by `provider`.
201    ///
202    /// The builder uses sensible defaults (10 turns, 4096 tokens) so the
203    /// minimum required configuration is just a system prompt.
204    ///
205    /// # Example
206    ///
207    /// ```rust,no_run
208    /// use std::sync::Arc;
209    /// use heartbit_core::{AgentRunner, AnthropicProvider, BoxedProvider};
210    ///
211    /// # async fn run() -> Result<(), heartbit_core::Error> {
212    /// let provider = Arc::new(BoxedProvider::new(AnthropicProvider::new(
213    ///     "sk-...",
214    ///     "claude-sonnet-4-20250514",
215    /// )));
216    /// let agent = AgentRunner::builder(provider)
217    ///     .system_prompt("You are a helpful assistant.")
218    ///     .build()?;
219    /// # let _ = agent;
220    /// # Ok(()) }
221    /// ```
222    pub fn builder(provider: Arc<P>) -> AgentRunnerBuilder<P> {
223        AgentRunnerBuilder {
224            provider,
225            name: "agent".into(),
226            system_prompt: String::new(),
227            tools: Vec::new(),
228            max_turns: 10,
229            max_tokens: 4096,
230            context_strategy: None,
231            summarize_threshold: None,
232            memory: None,
233            knowledge_base: None,
234            on_text: None,
235            on_approval: None,
236            tool_timeout: None,
237            max_tool_output_bytes: None,
238            structured_schema: None,
239            on_event: None,
240            guardrails: Vec::new(),
241            on_question: None,
242            on_input: None,
243            run_timeout: None,
244            reasoning_effort: None,
245            enable_reflection: false,
246            tool_output_compression_threshold: None,
247            max_tools_per_turn: None,
248            tool_profile: None,
249            max_identical_tool_calls: None,
250            max_fuzzy_identical_tool_calls: None,
251            max_tool_calls_per_turn: None,
252            permission_rules: permission::PermissionRuleset::default(),
253            instruction_text: None,
254            learned_permissions: None,
255            lsp_manager: None,
256            session_prune_config: None,
257            enable_recursive_summarization: false,
258            reflection_threshold: None,
259            consolidate_on_exit: false,
260            observability_mode: None,
261            workspace: None,
262            max_total_tokens: None,
263            audit_mode: super::audit::AuditMode::Full,
264            audit_trail: None,
265            audit_user_id: None,
266            audit_tenant_id: None,
267            audit_delegation_chain: Vec::new(),
268            response_cache_size: None,
269            tenant_tracker: None,
270        }
271    }
272
273    /// Returns the agent's name.
274    pub fn name(&self) -> &str {
275        &self.name
276    }
277
278    /// Read-access to the permission rules (acquires read lock).
279    fn eval_permission(
280        &self,
281        tool_name: &str,
282        input: &serde_json::Value,
283    ) -> Option<permission::PermissionAction> {
284        self.permission_rules.read().evaluate(tool_name, input)
285    }
286
287    /// Check if the permission ruleset has any rules.
288    fn has_permission_rules(&self) -> bool {
289        !self.permission_rules.read().is_empty()
290    }
291
292    fn emit(&self, event: AgentEvent) {
293        if let Some(ref cb) = self.on_event {
294            cb(event);
295        }
296    }
297
298    /// Record an audit entry (best-effort). Failures are logged, never abort the agent.
299    async fn audit(&self, mut record: AuditRecord) {
300        if let Some(ref trail) = self.audit_trail {
301            if self.audit_mode == super::audit::AuditMode::MetadataOnly {
302                // Owned variant skips the top-level + per-scalar clones
303                // (P-CROSS-7) — ~1 ms saved per record on 100 KB payloads.
304                let payload = std::mem::take(&mut record.payload);
305                record.payload = super::audit::strip_content_owned(payload);
306            }
307            if let Err(e) = trail.record(record).await {
308                tracing::warn!(error = %e, "audit record failed");
309            }
310        }
311    }
312
313    /// Persist an AlwaysAllow/AlwaysDeny decision as a learned permission rule.
314    ///
315    /// For each distinct tool name in the tool calls, a tool-level rule is created
316    /// (`pattern: "*"`). The rule is added to both the in-memory ruleset and the
317    /// on-disk learned permissions file.
318    fn persist_approval_decision(
319        &self,
320        tool_calls: &[ToolCall],
321        decision: crate::llm::ApprovalDecision,
322    ) {
323        let action = if decision.is_allowed() {
324            permission::PermissionAction::Allow
325        } else {
326            permission::PermissionAction::Deny
327        };
328        // Collect distinct tool names
329        let mut seen = std::collections::HashSet::new();
330        let mut new_rules = Vec::new();
331        for tc in tool_calls {
332            if seen.insert(tc.name.clone()) {
333                new_rules.push(permission::PermissionRule {
334                    tool: tc.name.clone(),
335                    pattern: "*".into(),
336                    action,
337                });
338            }
339        }
340        // Inject into the live ruleset so the rule takes effect immediately
341        // within this session (not just after restart).
342        self.permission_rules.write().append_rules(&new_rules);
343        // Persist to disk if learned permissions are configured
344        if let Some(ref learned) = self.learned_permissions {
345            for rule in new_rules {
346                if let Ok(mut guard) = learned.lock()
347                    && let Err(e) = guard.add_rule(rule)
348                {
349                    tracing::warn!(
350                        error = %e,
351                        "failed to persist learned permission rule"
352                    );
353                }
354            }
355        }
356    }
357
358    /// Estimate cost in USD based on model pricing and accumulated token usage.
359    fn estimate_cost(&self, usage: &TokenUsage) -> Option<f64> {
360        self.provider
361            .model_name()
362            .and_then(|model| crate::llm::pricing::estimate_cost(model, usage))
363    }
364
365    /// Run the agent on `task` and return the final output.
366    pub async fn execute(&self, task: &str) -> Result<AgentOutput, Error> {
367        let ctx = AgentContext::new(&self.system_prompt, task, self.tool_defs.clone())
368            .with_max_turns(self.max_turns)
369            .with_max_tokens(self.max_tokens)
370            .with_context_strategy(self.context_strategy.clone())
371            .with_reasoning_effort(self.reasoning_effort);
372        self.execute_with_context(ctx, task).await
373    }
374
375    /// Execute with pre-built multimodal content blocks (e.g., text + images).
376    pub async fn execute_with_content(
377        &self,
378        content: Vec<ContentBlock>,
379    ) -> Result<AgentOutput, Error> {
380        // Extract text for event/span descriptions
381        let task_summary: String = content
382            .iter()
383            .filter_map(|b| match b {
384                ContentBlock::Text { text } => Some(text.as_str()),
385                _ => None,
386            })
387            .collect::<Vec<_>>()
388            .join(" ");
389
390        let ctx = AgentContext::from_content(&self.system_prompt, content, self.tool_defs.clone())
391            .with_max_turns(self.max_turns)
392            .with_max_tokens(self.max_tokens)
393            .with_context_strategy(self.context_strategy.clone())
394            .with_reasoning_effort(self.reasoning_effort);
395        self.execute_with_context(ctx, &task_summary).await
396    }
397
398    async fn execute_with_context(
399        &self,
400        ctx: AgentContext,
401        task_description: &str,
402    ) -> Result<AgentOutput, Error> {
403        // Shared accumulator so we can retrieve partial usage even when the
404        // future is dropped by tokio::time::timeout.
405        let usage_acc = Arc::new(std::sync::Mutex::new(TokenUsage::default()));
406        let fut = {
407            let acc = usage_acc.clone();
408            async move {
409                match self.execute_inner(ctx, task_description, acc).await {
410                    Ok(output) => Ok(output),
411                    Err((e, usage)) => Err(e.with_partial_usage(usage)),
412                }
413            }
414        };
415        let mut result = match self.run_timeout {
416            Some(timeout) => match tokio::time::timeout(timeout, fut).await {
417                Ok(result) => result,
418                Err(_) => {
419                    let usage = *usage_acc.lock().expect("usage lock poisoned");
420                    Err(Error::RunTimeout(timeout).with_partial_usage(usage))
421                }
422            },
423            None => fut.await,
424        };
425
426        // Audit: run failed
427        if let Err(ref e) = result {
428            self.audit(AuditRecord {
429                agent: self.name.clone(),
430                turn: 0,
431                event_type: "run_failed".into(),
432                payload: serde_json::json!({
433                    "error": e.to_string(),
434                }),
435                usage: e.partial_usage(),
436                timestamp: chrono::Utc::now(),
437                user_id: self.audit_user_id.clone(),
438                tenant_id: self.audit_tenant_id.clone(),
439                delegation_chain: self.audit_delegation_chain.clone(),
440            })
441            .await;
442        }
443
444        // Session-end maintenance (best-effort, errors logged but not propagated).
445        if let Ok(ref mut output) = result {
446            // Consolidate related episodic memories into semantic summaries (opt-in).
447            let consolidation_usage = self.consolidate_memory_on_exit().await;
448            if consolidation_usage.input_tokens > 0 || consolidation_usage.output_tokens > 0 {
449                output.tokens_used += consolidation_usage;
450                // Add consolidation cost increment (uses static model name — consolidation
451                // always runs through the same provider, not cascade tiers).
452                if let Some(consolidation_cost) = self.estimate_cost(&consolidation_usage) {
453                    output.estimated_cost_usd =
454                        Some(output.estimated_cost_usd.unwrap_or(0.0) + consolidation_cost);
455                }
456            }
457
458            // Prune weak/old memories.
459            self.prune_memory_on_exit().await;
460        }
461
462        result
463    }
464
465    async fn execute_inner(
466        &self,
467        initial_ctx: AgentContext,
468        task: &str,
469        usage_acc: Arc<std::sync::Mutex<TokenUsage>>,
470    ) -> Result<AgentOutput, (Error, TokenUsage)> {
471        let mode = self.observability_mode;
472        let run_span = info_span!(
473            "heartbit.agent.run",
474            agent = %self.name,
475            max_turns = self.max_turns,
476            task = tracing::field::Empty,
477            model = tracing::field::Empty,
478            total_input_tokens = tracing::field::Empty,
479            total_output_tokens = tracing::field::Empty,
480            estimated_cost_usd = tracing::field::Empty,
481        );
482        if mode.includes_metrics()
483            && let Some(model) = self.provider.model_name()
484        {
485            run_span.record("model", model);
486        }
487        if mode.includes_payloads() {
488            run_span.record(
489                "task",
490                truncate_for_event(task, EVENT_MAX_PAYLOAD_BYTES).as_str(),
491            );
492        } else if mode.includes_metrics() {
493            let cut = crate::tool::builtins::floor_char_boundary(task, 256);
494            run_span.record("task", &task[..cut]);
495        }
496
497        let result = async {
498            self.emit(AgentEvent::RunStarted {
499                agent: self.name.clone(),
500                task: task.to_string(),
501            });
502
503            let mut ctx = initial_ctx;
504
505            let mut total_tool_calls = 0usize;
506            let mut total_usage = TokenUsage::default();
507            // Accumulate cost per-turn for accurate cascade pricing.
508            let mut total_cost: f64 = 0.0;
509            // Track recently used tool names (last 2 turns) for dynamic tool selection
510            let mut recently_used_tools: Vec<String> = Vec::new();
511            let mut doom_tracker = DoomLoopTracker::new();
512            let mut last_model_name: Option<String> = None;
513            // Prevents infinite compaction loops: set true after compaction,
514            // cleared at the start of each normal iteration.
515            let mut compacted_last_turn = false;
516
517            loop {
518                if ctx.current_turn() >= ctx.max_turns() {
519                    self.emit(AgentEvent::RunFailed {
520                        agent: self.name.clone(),
521                        error: format!("Max turns ({}) exceeded", ctx.max_turns()),
522                        partial_usage: total_usage,
523                    });
524                    return Err((Error::MaxTurnsExceeded(ctx.max_turns()), total_usage));
525                }
526
527                ctx.increment_turn();
528                let can_compact = !compacted_last_turn;
529                compacted_last_turn = false;
530                debug!(agent = %self.name, turn = ctx.current_turn(), "executing turn");
531                self.emit(AgentEvent::TurnStarted {
532                    agent: self.name.clone(),
533                    turn: ctx.current_turn(),
534                    max_turns: ctx.max_turns(),
535                });
536
537                // Provide turn context to stateful guardrails
538                for g in &self.guardrails {
539                    g.set_turn(ctx.current_turn());
540                }
541
542                // Session pruning: create a pruned view of messages for this LLM call
543                let mut request = if let Some(ref prune_config) = self.session_prune_config {
544                    let mut req = ctx.to_request();
545                    let (pruned_msgs, prune_stats) =
546                        pruner::prune_old_tool_results(&req.messages, prune_config);
547                    req.messages = pruned_msgs;
548                    if prune_stats.did_prune() {
549                        debug!(
550                            agent = %self.name,
551                            turn = ctx.current_turn(),
552                            pruned = prune_stats.tool_results_pruned,
553                            total = prune_stats.tool_results_total,
554                            bytes_saved = prune_stats.bytes_saved,
555                            "session pruning applied"
556                        );
557                        self.emit(AgentEvent::SessionPruned {
558                            agent: self.name.clone(),
559                            turn: ctx.current_turn(),
560                            tool_results_pruned: prune_stats.tool_results_pruned,
561                            bytes_saved: prune_stats.bytes_saved,
562                            tool_results_total: prune_stats.tool_results_total,
563                        });
564                    }
565                    req
566                } else {
567                    ctx.to_request()
568                };
569
570                // Tool profile pre-filter: narrow tool set based on query classification
571                if let Some(profile) = self.tool_profile {
572                    request.tools = tool_filter::filter_tools(&request.tools, profile);
573                }
574
575                // Dynamic tool selection: filter tools when there are too many
576                if let Some(max_tools) = self.max_tools_per_turn {
577                    request.tools = self.select_tools_for_turn(
578                        &request.tools,
579                        &request.messages,
580                        &recently_used_tools,
581                        max_tools,
582                    );
583                }
584
585                for g in &self.guardrails {
586                    if let Err(e) = g.pre_llm(&mut request).await {
587                        self.emit(AgentEvent::RunFailed {
588                            agent: self.name.clone(),
589                            error: e.to_string(),
590                            partial_usage: total_usage,
591                        });
592                        return Err((e, total_usage));
593                    }
594                }
595                // Response cache: compute key for non-streaming requests.
596                // SECURITY (F-AGENT-3): scope the cache by tenant_id+user_id
597                // when known. Otherwise a runner shared across tenants could
598                // serve tenant A's cached response to tenant B if their
599                // (system_prompt, messages, tools) tuple coincides.
600                let cache_key = if self.response_cache.is_some() && self.on_text.is_none() {
601                    let tool_names: Vec<&str> =
602                        request.tools.iter().map(|t| t.name.as_str()).collect();
603                    let namespace = match (&self.audit_tenant_id, &self.audit_user_id) {
604                        (Some(t), Some(u)) => Some(format!("{t}:{u}")),
605                        (Some(t), None) => Some(t.clone()),
606                        (None, Some(u)) => Some(format!(":{u}")),
607                        (None, None) => None,
608                    };
609                    Some(cache::ResponseCache::compute_key_scoped(
610                        &request.system,
611                        &request.messages,
612                        &tool_names,
613                        namespace.as_deref(),
614                    ))
615                } else {
616                    None
617                };
618                // Check cache before calling LLM
619                let cache_hit = cache_key
620                    .and_then(|k| self.response_cache.as_ref().and_then(|c| c.get(k)));
621                let llm_start = Instant::now();
622                let llm_span = info_span!(
623                    "heartbit.agent.llm_call",
624                    agent = %self.name,
625                    turn = ctx.current_turn(),
626                    { observability::GEN_AI_REQUEST_MODEL } = tracing::field::Empty,
627                    latency_ms = tracing::field::Empty,
628                    { observability::GEN_AI_USAGE_INPUT_TOKENS } = tracing::field::Empty,
629                    { observability::GEN_AI_USAGE_OUTPUT_TOKENS } = tracing::field::Empty,
630                    { observability::GEN_AI_RESPONSE_FINISH_REASON } = tracing::field::Empty,
631                    tool_call_count = tracing::field::Empty,
632                    ttft_ms = tracing::field::Empty,
633                    response_text = tracing::field::Empty,
634                    cache_hit = tracing::field::Empty,
635                );
636                let llm_result = if let Some(cached) = cache_hit {
637                    tracing::debug!(
638                        agent = %self.name,
639                        turn = ctx.current_turn(),
640                        "response cache hit, skipping LLM call"
641                    );
642                    if mode.includes_metrics() {
643                        llm_span.record("cache_hit", true);
644                    }
645                    Ok(cached)
646                } else {
647                    // TTFT: wrap on_text to capture time-to-first-token
648                    let ttft_ms_inner = Arc::new(std::sync::atomic::AtomicU64::new(0));
649                    let ttft_ref = ttft_ms_inner.clone();
650                    let result = async {
651                        match &self.on_text {
652                            Some(cb) => {
653                                let ttft_ref = ttft_ref.clone();
654                                let start = llm_start;
655                                let inner_cb = cb.clone();
656                                let wrapper: Box<crate::llm::OnText> =
657                                    Box::new(move |text: &str| {
658                                        ttft_ref
659                                            .compare_exchange(
660                                                0,
661                                                start.elapsed().as_millis() as u64,
662                                                std::sync::atomic::Ordering::Relaxed,
663                                                std::sync::atomic::Ordering::Relaxed,
664                                            )
665                                            .ok();
666                                        inner_cb(text);
667                                    });
668                                self.provider.stream_complete(request, &*wrapper).await
669                            }
670                            None => self.provider.complete(request).await,
671                        }
672                    }
673                    .instrument(llm_span.clone())
674                    .await;
675                    // Store successful non-streaming responses in cache.
676                    // Only cache EndTurn responses — ToolUse responses trigger
677                    // side-effecting tool execution and must not be replayed.
678                    if let (Ok(resp), Some(key)) = (&result, cache_key)
679                        && resp.stop_reason == crate::llm::types::StopReason::EndTurn
680                        && let Some(ref c) = self.response_cache
681                    {
682                        c.put(key, resp.clone());
683                    }
684                    if mode.includes_metrics() {
685                        let ttft = ttft_ms_inner.load(std::sync::atomic::Ordering::Relaxed);
686                        llm_span.record("ttft_ms", ttft);
687                        llm_span.record("cache_hit", false);
688                    }
689                    result
690                };
691                let llm_latency_ms = llm_start.elapsed().as_millis() as u64;
692                // Record LLM call span attributes
693                if mode.includes_metrics() {
694                    llm_span.record("latency_ms", llm_latency_ms);
695                    if let Ok(ref r) = llm_result {
696                        if let Some(ref model) = r.model {
697                            llm_span.record(observability::GEN_AI_REQUEST_MODEL, model.as_str());
698                        } else if let Some(model) = self.provider.model_name() {
699                            llm_span.record(observability::GEN_AI_REQUEST_MODEL, model);
700                        }
701                    } else if let Some(model) = self.provider.model_name() {
702                        llm_span.record(observability::GEN_AI_REQUEST_MODEL, model);
703                    }
704                    if let Ok(ref r) = llm_result {
705                        llm_span.record(
706                            observability::GEN_AI_USAGE_INPUT_TOKENS,
707                            r.usage.input_tokens,
708                        );
709                        llm_span.record(
710                            observability::GEN_AI_USAGE_OUTPUT_TOKENS,
711                            r.usage.output_tokens,
712                        );
713                        llm_span.record(
714                            observability::GEN_AI_RESPONSE_FINISH_REASON,
715                            format!("{:?}", r.stop_reason).as_str(),
716                        );
717                        llm_span.record("tool_call_count", r.tool_calls().len());
718                    }
719                }
720                if mode.includes_payloads()
721                    && let Ok(ref r) = llm_result
722                {
723                    llm_span.record(
724                        "response_text",
725                        truncate_for_event(&r.text(), EVENT_MAX_PAYLOAD_BYTES).as_str(),
726                    );
727                }
728                let mut response = match llm_result {
729                    Ok(r) => r,
730                    Err(e) => {
731                        // Auto-compaction: on context overflow, summarize and retry
732                        if crate::llm::error_class::classify(&e)
733                            == crate::llm::error_class::ErrorClass::ContextOverflow
734                            && can_compact
735                            && ctx.message_count() > 5
736                        {
737                            tracing::warn!(
738                                agent = %self.name,
739                                error = %e,
740                                "context overflow detected, attempting auto-compaction"
741                            );
742                            match self.generate_summary(&ctx).await {
743                                Ok((Some(summary), summary_usage)) => {
744                                    total_usage += summary_usage;
745                                    if let Some(c) = self.estimate_cost(&summary_usage) {
746                                        total_cost += c;
747                                    }
748                                    *usage_acc.lock().expect("usage lock poisoned") = total_usage;
749                                    self.flush_to_memory_before_compaction(&ctx, 4).await;
750                                    ctx.inject_summary(summary, 4);
751                                    self.emit(AgentEvent::AutoCompactionTriggered {
752                                        agent: self.name.clone(),
753                                        turn: ctx.current_turn(),
754                                        success: true,
755                                        usage: summary_usage,
756                                    });
757                                    self.emit(AgentEvent::ContextSummarized {
758                                        agent: self.name.clone(),
759                                        turn: ctx.current_turn(),
760                                        usage: summary_usage,
761                                    });
762                                    compacted_last_turn = true;
763                                    continue;
764                                }
765                                Ok((None, summary_usage)) => {
766                                    total_usage += summary_usage;
767                                    *usage_acc.lock().expect("usage lock poisoned") = total_usage;
768                                    self.emit(AgentEvent::AutoCompactionTriggered {
769                                        agent: self.name.clone(),
770                                        turn: ctx.current_turn(),
771                                        success: false,
772                                        usage: summary_usage,
773                                    });
774                                    tracing::warn!(
775                                        agent = %self.name,
776                                        "auto-compaction summary was truncated, cannot compact"
777                                    );
778                                }
779                                Err(summary_err) => {
780                                    self.emit(AgentEvent::AutoCompactionTriggered {
781                                        agent: self.name.clone(),
782                                        turn: ctx.current_turn(),
783                                        success: false,
784                                        usage: TokenUsage::default(),
785                                    });
786                                    tracing::warn!(
787                                        agent = %self.name,
788                                        error = %summary_err,
789                                        "auto-compaction summary failed"
790                                    );
791                                }
792                            }
793                        }
794                        self.emit(AgentEvent::RunFailed {
795                            agent: self.name.clone(),
796                            error: e.to_string(),
797                            partial_usage: total_usage,
798                        });
799                        return Err((e, total_usage));
800                    }
801                };
802                total_usage += response.usage;
803
804                // Reconcile per-tenant in-flight token estimate with actual usage.
805                // Uses cumulative `total_usage` (not per-turn) so the tracker always
806                // reflects the true running total and multi-turn deltas are correct.
807                if let (Some(tracker), Some(tid)) =
808                    (&self.tenant_tracker, &self.audit_tenant_id)
809                {
810                    let actual =
811                        (total_usage.input_tokens + total_usage.output_tokens) as usize;
812                    let prev = self
813                        .cumulative_actual_tokens
814                        .swap(actual, std::sync::atomic::Ordering::SeqCst);
815                    let delta = actual as i64 - prev as i64;
816                    let scope = crate::auth::TenantScope::new(tid.clone());
817                    tracker.adjust(&scope, delta);
818                }
819
820                // Per-turn cost: prefer response.model (cascade) over static model_name()
821                let turn_model = response
822                    .model
823                    .as_deref()
824                    .or_else(|| self.provider.model_name());
825                if let Some(model) = turn_model {
826                    last_model_name = Some(model.to_string());
827                    if let Some(cost) =
828                        crate::llm::pricing::estimate_cost(model, &response.usage)
829                    {
830                        total_cost += cost;
831                    }
832                }
833                // Update shared accumulator so RunTimeout can retrieve partial usage
834                *usage_acc.lock().expect("usage lock poisoned") = total_usage;
835
836                // Check token budget
837                if let Some(max) = self.max_total_tokens {
838                    let used = total_usage.total();
839                    if used > max {
840                        self.emit(AgentEvent::BudgetExceeded {
841                            agent: self.name.clone(),
842                            used,
843                            limit: max,
844                            partial_usage: total_usage,
845                        });
846                        return Err((
847                            Error::BudgetExceeded { used, limit: max },
848                            total_usage,
849                        ));
850                    }
851                }
852
853                let mut tool_calls = response.tool_calls();
854
855                // SECURITY (F-AGENT-1): repair Levenshtein-close typos in tool names
856                // BEFORE permissions and pre_tool guardrails see them. Otherwise an
857                // LLM could emit `bask` to bypass a `bash` deny-rule and have it
858                // silently dispatched to `bash` later. We mutate `call.name` here
859                // and emit a `ToolNameRepaired` event so the audit trail records
860                // the substitution. The repair only fires for unknown names; exact
861                // matches are untouched.
862                for call in tool_calls.iter_mut() {
863                    if !self.tools.contains_key(&call.name)
864                        && let Some(repaired) = self.find_closest_tool(&call.name, 2)
865                    {
866                        let repaired = repaired.to_string();
867                        tracing::warn!(
868                            agent = %self.name,
869                            original = %call.name,
870                            repaired = %repaired,
871                            "tool name repaired via Levenshtein match (pre-policy)"
872                        );
873                        self.emit(AgentEvent::ToolNameRepaired {
874                            agent: self.name.clone(),
875                            original: call.name.clone(),
876                            repaired: repaired.clone(),
877                        });
878                        call.name = repaired;
879                    }
880                }
881
882                // Tool-call cap: reject turns that exceed max_tool_calls_per_turn.
883                // Checked before dispatch so no tools are executed on a capped turn.
884                if let Some(cap) = self.max_tool_calls_per_turn
885                    && tool_calls.len() as u32 > cap
886                {
887                    let err = Error::Agent(format!(
888                        "tool-call cap exceeded: turn produced {} calls, max is {cap}",
889                        tool_calls.len()
890                    ));
891                    self.emit(AgentEvent::RunFailed {
892                        agent: self.name.clone(),
893                        error: err.to_string(),
894                        partial_usage: total_usage,
895                    });
896                    return Err((err, total_usage));
897                }
898
899                self.emit(AgentEvent::LlmResponse {
900                    agent: self.name.clone(),
901                    turn: ctx.current_turn(),
902                    usage: response.usage,
903                    stop_reason: response.stop_reason,
904                    tool_call_count: tool_calls.len(),
905                    text: truncate_for_event(&response.text(), EVENT_MAX_PAYLOAD_BYTES),
906                    latency_ms: llm_latency_ms,
907                    model: response
908                        .model
909                        .clone()
910                        .or_else(|| self.provider.model_name().map(|s| s.to_string())),
911                    time_to_first_token_ms: 0,
912                });
913
914                // Audit: LLM response (untruncated)
915                self.audit(AuditRecord {
916                    agent: self.name.clone(),
917                    turn: ctx.current_turn(),
918                    event_type: "llm_response".into(),
919                    payload: serde_json::json!({
920                        "text": response.text(),
921                        "stop_reason": format!("{:?}", response.stop_reason),
922                        "tool_call_count": tool_calls.len(),
923                        "latency_ms": llm_latency_ms,
924                        "model": response.model.as_deref()
925                            .or_else(|| self.provider.model_name()),
926                    }),
927                    usage: response.usage,
928                    timestamp: chrono::Utc::now(),
929                user_id: self.audit_user_id.clone(),
930                tenant_id: self.audit_tenant_id.clone(),
931                delegation_chain: self.audit_delegation_chain.clone(),
932                })
933                .await;
934
935                // post_llm guardrail: inspect response, first Deny discards it.
936                // When denied, we insert a synthetic assistant message before the
937                // denial feedback to maintain the alternating user/assistant message
938                // invariant required by the Anthropic API.
939                let mut post_llm_denied = false;
940                for g in &self.guardrails {
941                    match g
942                        .post_llm(&mut response)
943                        .await
944                        .map_err(|e| (e, total_usage))?
945                    {
946                        GuardAction::Allow => {}
947                        GuardAction::Warn { reason } => {
948                            self.emit(AgentEvent::GuardrailWarned {
949                                agent: self.name.clone(),
950                                hook: "post_llm".into(),
951                                reason: reason.clone(),
952                                tool_name: None,
953                            });
954                            self.audit(AuditRecord {
955                                agent: self.name.clone(),
956                                turn: ctx.current_turn(),
957                                event_type: "guardrail_warned".into(),
958                                payload: serde_json::json!({
959                                    "hook": "post_llm",
960                                    "reason": reason,
961                                }),
962                                usage: TokenUsage::default(),
963                                timestamp: chrono::Utc::now(),
964                user_id: self.audit_user_id.clone(),
965                tenant_id: self.audit_tenant_id.clone(),
966                delegation_chain: self.audit_delegation_chain.clone(),
967                            })
968                            .await;
969                            // Continue — do NOT discard the response
970                        }
971                        GuardAction::Deny { reason } => {
972                            self.emit(AgentEvent::GuardrailDenied {
973                                agent: self.name.clone(),
974                                hook: "post_llm".into(),
975                                reason: reason.clone(),
976                                tool_name: None,
977                            });
978                            // Audit: guardrail denied
979                            self.audit(AuditRecord {
980                                agent: self.name.clone(),
981                                turn: ctx.current_turn(),
982                                event_type: "guardrail_denied".into(),
983                                payload: serde_json::json!({
984                                    "hook": "post_llm",
985                                    "reason": reason,
986                                }),
987                                usage: TokenUsage::default(),
988                                timestamp: chrono::Utc::now(),
989                user_id: self.audit_user_id.clone(),
990                tenant_id: self.audit_tenant_id.clone(),
991                delegation_chain: self.audit_delegation_chain.clone(),
992                            })
993                            .await;
994                            // Maintain alternating roles: assistant placeholder, then user denial
995                            ctx.add_assistant_message(Message {
996                                role: crate::llm::types::Role::Assistant,
997                                content: vec![ContentBlock::Text {
998                                    text: "[Response denied by guardrail]".into(),
999                                }],
1000                            });
1001                            ctx.add_user_message(format!(
1002                            "[Guardrail denied your previous response: {reason}. Please try again.]"
1003                        ));
1004                            post_llm_denied = true;
1005                            break;
1006                        }
1007                        GuardAction::Kill { reason } => {
1008                            self.emit(AgentEvent::KillSwitchActivated {
1009                                agent: self.name.clone(),
1010                                reason: reason.clone(),
1011                                guardrail_name: g.name().to_string(),
1012                            });
1013                            self.audit(AuditRecord {
1014                                agent: self.name.clone(),
1015                                turn: ctx.current_turn(),
1016                                event_type: "guardrail_killed".into(),
1017                                payload: serde_json::json!({
1018                                    "hook": "post_llm",
1019                                    "reason": reason,
1020                                }),
1021                                usage: TokenUsage::default(),
1022                                timestamp: chrono::Utc::now(),
1023                                user_id: self.audit_user_id.clone(),
1024                                tenant_id: self.audit_tenant_id.clone(),
1025                                delegation_chain: self.audit_delegation_chain.clone(),
1026                            })
1027                            .await;
1028                            return Err((
1029                                Error::KillSwitch(reason),
1030                                total_usage,
1031                            ));
1032                        }
1033                    }
1034                }
1035                if post_llm_denied {
1036                    continue;
1037                }
1038
1039                // Add assistant message to context (move content, avoid clone)
1040                ctx.add_assistant_message(Message {
1041                    role: crate::llm::types::Role::Assistant,
1042                    content: response.content,
1043                });
1044
1045                // Evict base64 media from older messages to prevent context bloat.
1046                ctx.evict_media();
1047
1048                // Check for structured output: if the LLM called the synthetic `__respond__` tool,
1049                // validate its input against the schema, then extract as structured result.
1050                // Count ALL tool calls in this turn (including co-submitted ones) for parity
1051                // with the Restate path, even though non-__respond__ calls are not executed.
1052                if let Some(ref schema) = self.structured_schema
1053                    && let Some(respond_call) = tool_calls
1054                        .iter()
1055                        .find(|tc| tc.name == crate::llm::types::RESPOND_TOOL_NAME)
1056                {
1057                    let structured = respond_call.input.clone();
1058
1059                    // Validate against the caller's schema before accepting.
1060                    if let Err(validation_error) =
1061                        crate::tool::validate_tool_input(schema, &structured)
1062                    {
1063                        // Count the failed attempt and feed the validation error
1064                        // back to the LLM so it can self-correct on the next turn.
1065                        total_tool_calls += tool_calls.len();
1066                        tracing::warn!(
1067                            agent = %self.name,
1068                            error = %validation_error,
1069                            "structured output failed schema validation, retrying"
1070                        );
1071                        ctx.add_tool_results(vec![ToolResult {
1072                            tool_use_id: respond_call.id.clone(),
1073                            content: format!(
1074                                "Structured output validation failed: {validation_error}. \
1075                                 Please fix the output to match the schema and call __respond__ again."
1076                            ),
1077                            is_error: true,
1078                        }]);
1079                        continue;
1080                    }
1081
1082                    total_tool_calls += tool_calls.len();
1083                    let text = serde_json::to_string_pretty(&structured)
1084                        .unwrap_or_else(|_| structured.to_string());
1085                    self.emit(AgentEvent::RunCompleted {
1086                        agent: self.name.clone(),
1087                        total_usage,
1088                        tool_calls_made: total_tool_calls,
1089                    });
1090                    // Audit: run completed (structured)
1091                    let preview_end =
1092                        crate::tool::builtins::floor_char_boundary(&text, 1000);
1093                    self.audit(AuditRecord {
1094                        agent: self.name.clone(),
1095                        turn: ctx.current_turn(),
1096                        event_type: "run_completed".into(),
1097                        payload: serde_json::json!({
1098                            "total_tool_calls": total_tool_calls,
1099                            "result_preview": &text[..preview_end],
1100                        }),
1101                        usage: total_usage,
1102                        timestamp: chrono::Utc::now(),
1103                user_id: self.audit_user_id.clone(),
1104                tenant_id: self.audit_tenant_id.clone(),
1105                delegation_chain: self.audit_delegation_chain.clone(),
1106                    })
1107                    .await;
1108                    return Ok(AgentOutput {
1109                        result: text,
1110                        tool_calls_made: total_tool_calls,
1111                        tokens_used: total_usage,
1112                        structured: Some(structured),
1113                        estimated_cost_usd: if total_cost > 0.0 {
1114                            Some(total_cost)
1115                        } else {
1116                            self.estimate_cost(&total_usage)
1117                        },
1118                        model_name: last_model_name.clone(),
1119                    });
1120                }
1121
1122                if tool_calls.is_empty() {
1123                    // Check for truncation
1124                    if response.stop_reason == StopReason::MaxTokens {
1125                        self.emit(AgentEvent::RunFailed {
1126                            agent: self.name.clone(),
1127                            error: "Response truncated (max_tokens reached)".into(),
1128                            partial_usage: total_usage,
1129                        });
1130                        return Err((Error::Truncated, total_usage));
1131                    }
1132
1133                    // Structured output was requested but LLM returned text without
1134                    // calling __respond__. This is a contract violation — the caller
1135                    // expects structured output but would get None silently.
1136                    if self.structured_schema.is_some() {
1137                        self.emit(AgentEvent::RunFailed {
1138                            agent: self.name.clone(),
1139                            error: "LLM returned text without calling __respond__".into(),
1140                            partial_usage: total_usage,
1141                        });
1142                        return Err((
1143                            Error::Agent(
1144                                "LLM returned text without calling __respond__; \
1145                             structured output was not produced"
1146                                    .into(),
1147                            ),
1148                            total_usage,
1149                        ));
1150                    }
1151
1152                    // Interactive mode: if on_input is set, ask for more input
1153                    // instead of returning. This enables multi-turn conversations.
1154                    if let Some(ref on_input) = self.on_input
1155                        && let Some(next_message) = on_input().await
1156                        && !next_message.trim().is_empty()
1157                    {
1158                        ctx.add_user_message(next_message);
1159                        continue;
1160                    }
1161
1162                    self.emit(AgentEvent::RunCompleted {
1163                        agent: self.name.clone(),
1164                        total_usage,
1165                        tool_calls_made: total_tool_calls,
1166                    });
1167                    let result_text =
1168                        ctx.last_assistant_text().unwrap_or_default().to_string();
1169                    // Audit: run completed
1170                    let preview_end =
1171                        crate::tool::builtins::floor_char_boundary(&result_text, 1000);
1172                    self.audit(AuditRecord {
1173                        agent: self.name.clone(),
1174                        turn: ctx.current_turn(),
1175                        event_type: "run_completed".into(),
1176                        payload: serde_json::json!({
1177                            "total_tool_calls": total_tool_calls,
1178                            "result_preview": &result_text[..preview_end],
1179                        }),
1180                        usage: total_usage,
1181                        timestamp: chrono::Utc::now(),
1182                user_id: self.audit_user_id.clone(),
1183                tenant_id: self.audit_tenant_id.clone(),
1184                delegation_chain: self.audit_delegation_chain.clone(),
1185                    })
1186                    .await;
1187                    return Ok(AgentOutput {
1188                        result: result_text,
1189                        tool_calls_made: total_tool_calls,
1190                        tokens_used: total_usage,
1191                        structured: None,
1192                        estimated_cost_usd: if total_cost > 0.0 {
1193                            Some(total_cost)
1194                        } else {
1195                            self.estimate_cost(&total_usage)
1196                        },
1197                        model_name: last_model_name.clone(),
1198                    });
1199                }
1200
1201                // Permission rules + human-in-the-loop approval.
1202                //
1203                // When permission rules are set, each call is evaluated individually:
1204                //   Allow → execute without asking
1205                //   Deny  → error result
1206                //   Ask   → deferred to `on_approval` callback
1207                // Calls with no matching rule are also deferred to `on_approval`.
1208                //
1209                // When no rules are set, the legacy behavior applies: if `on_approval`
1210                // is set, the entire batch is sent for approval.
1211                let (tool_calls, permission_denied_results) = if self.has_permission_rules() {
1212                    let mut allowed = Vec::new();
1213                    let mut denied = Vec::new();
1214                    let mut needs_approval = Vec::new();
1215
1216                    for call in tool_calls {
1217                        match self.eval_permission(&call.name, &call.input) {
1218                            Some(permission::PermissionAction::Allow) => {
1219                                allowed.push(call);
1220                            }
1221                            Some(permission::PermissionAction::Deny) => {
1222                                debug!(
1223                                    agent = %self.name,
1224                                    tool = %call.name,
1225                                    "tool call denied by permission rule"
1226                                );
1227                                denied.push(ToolResult::error(
1228                                    call.id.clone(),
1229                                    format!("Permission denied for tool '{}'", call.name),
1230                                ));
1231                            }
1232                            Some(permission::PermissionAction::Ask) | None => {
1233                                needs_approval.push(call);
1234                            }
1235                        }
1236                    }
1237
1238                    // Ask for the remaining calls via the on_approval callback
1239                    if !needs_approval.is_empty() {
1240                        if let Some(ref cb) = self.on_approval {
1241                            self.emit(AgentEvent::ApprovalRequested {
1242                                agent: self.name.clone(),
1243                                turn: ctx.current_turn(),
1244                                tool_names: needs_approval
1245                                    .iter()
1246                                    .map(|tc| tc.name.clone())
1247                                    .collect(),
1248                            });
1249                            let decision = cb(&needs_approval);
1250                            self.emit(AgentEvent::ApprovalDecision {
1251                                agent: self.name.clone(),
1252                                turn: ctx.current_turn(),
1253                                approved: decision.is_allowed(),
1254                            });
1255                            // Persist AlwaysAllow / AlwaysDeny as learned rules
1256                            if decision.is_persistent() {
1257                                self.persist_approval_decision(&needs_approval, decision);
1258                            }
1259                            if decision.is_allowed() {
1260                                allowed.extend(needs_approval);
1261                            } else {
1262                                for call in &needs_approval {
1263                                    denied.push(ToolResult::error(
1264                                        call.id.clone(),
1265                                        "Tool execution denied by human reviewer".to_string(),
1266                                    ));
1267                                }
1268                            }
1269                        } else {
1270                            // No callback → allow
1271                            allowed.extend(needs_approval);
1272                        }
1273                    }
1274
1275                    // If ALL calls were denied, add results and continue
1276                    if allowed.is_empty() && !denied.is_empty() {
1277                        total_tool_calls += denied.len();
1278                        ctx.add_tool_results(denied);
1279                        continue;
1280                    }
1281
1282                    (allowed, denied)
1283                } else if let Some(ref cb) = self.on_approval {
1284                    // Legacy path: no permission rules, batch approval callback
1285                    self.emit(AgentEvent::ApprovalRequested {
1286                        agent: self.name.clone(),
1287                        turn: ctx.current_turn(),
1288                        tool_names: tool_calls.iter().map(|tc| tc.name.clone()).collect(),
1289                    });
1290                    let decision = cb(&tool_calls);
1291                    self.emit(AgentEvent::ApprovalDecision {
1292                        agent: self.name.clone(),
1293                        turn: ctx.current_turn(),
1294                        approved: decision.is_allowed(),
1295                    });
1296                    // Persist AlwaysAllow / AlwaysDeny as learned rules
1297                    if decision.is_persistent() {
1298                        self.persist_approval_decision(&tool_calls, decision);
1299                    }
1300                    if !decision.is_allowed() {
1301                        debug!(
1302                            agent = %self.name,
1303                            "tool execution denied by approval callback"
1304                        );
1305                        let results: Vec<ToolResult> = tool_calls
1306                            .iter()
1307                            .map(|tc| {
1308                                ToolResult::error(
1309                                    tc.id.clone(),
1310                                    "Tool execution denied by human reviewer".to_string(),
1311                                )
1312                            })
1313                            .collect();
1314                        total_tool_calls += tool_calls.len();
1315                        ctx.add_tool_results(results);
1316                        continue;
1317                    }
1318                    (tool_calls, Vec::new())
1319                } else {
1320                    (tool_calls, Vec::new())
1321                };
1322
1323                // Doom loop detection: if the same set of tool calls is repeated
1324                // for N consecutive turns, return error results instead of executing.
1325                if let Some(threshold) = self.max_identical_tool_calls {
1326                    let (exact, fuzzy) = doom_tracker.record(
1327                        &tool_calls,
1328                        threshold,
1329                        self.max_fuzzy_identical_tool_calls,
1330                    );
1331                    if exact {
1332                        debug!(
1333                            agent = %self.name,
1334                            count = doom_tracker.count(),
1335                            "doom loop detected, returning error results"
1336                        );
1337                        self.emit(AgentEvent::DoomLoopDetected {
1338                            agent: self.name.clone(),
1339                            turn: ctx.current_turn(),
1340                            consecutive_count: doom_tracker.count(),
1341                            tool_names: tool_calls
1342                                .iter()
1343                                .map(|tc| tc.name.clone())
1344                                .collect(),
1345                        });
1346                        let results: Vec<ToolResult> = tool_calls
1347                            .iter()
1348                            .map(|tc| {
1349                                ToolResult::error(
1350                                    tc.id.clone(),
1351                                    format!(
1352                                        "Doom loop detected: identical tool calls repeated {} \
1353                                         times consecutively. Try a different approach.",
1354                                        doom_tracker.count()
1355                                    ),
1356                                )
1357                            })
1358                            .collect();
1359                        total_tool_calls += tool_calls.len();
1360                        ctx.add_tool_results(results);
1361                        continue;
1362                    } else if fuzzy {
1363                        debug!(
1364                            agent = %self.name,
1365                            count = doom_tracker.fuzzy_count(),
1366                            "fuzzy doom loop detected, returning error results"
1367                        );
1368                        self.emit(AgentEvent::FuzzyDoomLoopDetected {
1369                            agent: self.name.clone(),
1370                            turn: ctx.current_turn(),
1371                            consecutive_count: doom_tracker.fuzzy_count(),
1372                            tool_names: tool_calls
1373                                .iter()
1374                                .map(|tc| tc.name.clone())
1375                                .collect(),
1376                        });
1377                        let results: Vec<ToolResult> = tool_calls
1378                            .iter()
1379                            .map(|tc| {
1380                                ToolResult::error(
1381                                    tc.id.clone(),
1382                                    format!(
1383                                        "Fuzzy doom loop detected: same tools with different \
1384                                         inputs repeated {} times consecutively. Try a \
1385                                         completely different approach.",
1386                                        doom_tracker.fuzzy_count()
1387                                    ),
1388                                )
1389                            })
1390                            .collect();
1391                        total_tool_calls += tool_calls.len();
1392                        ctx.add_tool_results(results);
1393                        continue;
1394                    }
1395                }
1396
1397                // pre_tool guardrail: per-call fine-grained filter
1398                let (allowed_calls, denied_results) = if self.guardrails.is_empty() {
1399                    (tool_calls, Vec::new())
1400                } else {
1401                    let mut allowed = Vec::new();
1402                    let mut denied = Vec::new();
1403                    for call in tool_calls {
1404                        let mut call_denied = false;
1405                        for g in &self.guardrails {
1406                            match g.pre_tool(&call).await.map_err(|e| (e, total_usage))? {
1407                                GuardAction::Allow => {}
1408                                GuardAction::Warn { reason } => {
1409                                    self.emit(AgentEvent::GuardrailWarned {
1410                                        agent: self.name.clone(),
1411                                        hook: "pre_tool".into(),
1412                                        reason: reason.clone(),
1413                                        tool_name: Some(call.name.clone()),
1414                                    });
1415                                    self.audit(AuditRecord {
1416                                        agent: self.name.clone(),
1417                                        turn: ctx.current_turn(),
1418                                        event_type: "guardrail_warned".into(),
1419                                        payload: serde_json::json!({
1420                                            "hook": "pre_tool",
1421                                            "reason": reason,
1422                                            "tool_name": call.name,
1423                                        }),
1424                                        usage: TokenUsage::default(),
1425                                        timestamp: chrono::Utc::now(),
1426                user_id: self.audit_user_id.clone(),
1427                tenant_id: self.audit_tenant_id.clone(),
1428                delegation_chain: self.audit_delegation_chain.clone(),
1429                                    })
1430                                    .await;
1431                                    // Continue — do NOT deny the tool call
1432                                }
1433                                GuardAction::Deny { reason } => {
1434                                    self.emit(AgentEvent::GuardrailDenied {
1435                                        agent: self.name.clone(),
1436                                        hook: "pre_tool".into(),
1437                                        reason: reason.clone(),
1438                                        tool_name: Some(call.name.clone()),
1439                                    });
1440                                    // Audit: pre_tool guardrail denied
1441                                    self.audit(AuditRecord {
1442                                        agent: self.name.clone(),
1443                                        turn: ctx.current_turn(),
1444                                        event_type: "guardrail_denied".into(),
1445                                        payload: serde_json::json!({
1446                                            "hook": "pre_tool",
1447                                            "reason": reason,
1448                                            "tool_name": call.name,
1449                                        }),
1450                                        usage: TokenUsage::default(),
1451                                        timestamp: chrono::Utc::now(),
1452                user_id: self.audit_user_id.clone(),
1453                tenant_id: self.audit_tenant_id.clone(),
1454                delegation_chain: self.audit_delegation_chain.clone(),
1455                                    })
1456                                    .await;
1457                                    denied.push(ToolResult::error(
1458                                        call.id.clone(),
1459                                        format!("Guardrail denied: {reason}"),
1460                                    ));
1461                                    call_denied = true;
1462                                    break;
1463                                }
1464                                GuardAction::Kill { reason } => {
1465                                    self.emit(AgentEvent::KillSwitchActivated {
1466                                        agent: self.name.clone(),
1467                                        reason: reason.clone(),
1468                                        guardrail_name: g.name().to_string(),
1469                                    });
1470                                    self.audit(AuditRecord {
1471                                        agent: self.name.clone(),
1472                                        turn: ctx.current_turn(),
1473                                        event_type: "guardrail_killed".into(),
1474                                        payload: serde_json::json!({
1475                                            "hook": "pre_tool",
1476                                            "reason": reason,
1477                                            "tool_name": call.name,
1478                                        }),
1479                                        usage: TokenUsage::default(),
1480                                        timestamp: chrono::Utc::now(),
1481                                        user_id: self.audit_user_id.clone(),
1482                                        tenant_id: self.audit_tenant_id.clone(),
1483                                        delegation_chain: self.audit_delegation_chain.clone(),
1484                                    })
1485                                    .await;
1486                                    return Err((
1487                                        Error::KillSwitch(reason),
1488                                        total_usage,
1489                                    ));
1490                                }
1491                            }
1492                        }
1493                        if !call_denied {
1494                            allowed.push(call);
1495                        }
1496                    }
1497                    (allowed, denied)
1498                };
1499
1500                total_tool_calls +=
1501                    allowed_calls.len() + denied_results.len() + permission_denied_results.len();
1502                // Update recently-used tool list for dynamic tool selection
1503                recently_used_tools = allowed_calls.iter().map(|c| c.name.clone()).collect();
1504                let tool_batch_span = info_span!(
1505                    "heartbit.agent.tool_batch",
1506                    agent = %self.name,
1507                    turn = ctx.current_turn(),
1508                    tool_count = allowed_calls.len(),
1509                );
1510                let mut results = self
1511                    .execute_tools_parallel(&allowed_calls, ctx.current_turn())
1512                    .instrument(tool_batch_span)
1513                    .await;
1514                results.extend(denied_results);
1515                results.extend(permission_denied_results);
1516
1517                // LSP diagnostics: after file-modifying tools, collect diagnostics
1518                // and append to the tool result so the LLM sees errors immediately.
1519                if let Some(ref lsp) = self.lsp_manager {
1520                    self.append_lsp_diagnostics(lsp, &allowed_calls, &mut results)
1521                        .await;
1522                }
1523
1524                // Compress oversized tool outputs via LLM call
1525                if let Some(threshold) = self.tool_output_compression_threshold {
1526                    for result in &mut results {
1527                        if !result.is_error && result.content.len() > threshold {
1528                            let compressed = self
1529                                .compress_tool_output(&result.content, threshold, &mut total_usage)
1530                                .await;
1531                            result.content = compressed;
1532                        }
1533                    }
1534                    *usage_acc.lock().expect("usage lock poisoned") = total_usage;
1535                }
1536
1537                ctx.add_tool_results(results);
1538
1539                // Reflection: inject a user-role prompt that nudges the LLM to assess
1540                // tool results before deciding the next action (Reflexion/CRITIC pattern).
1541                if self.enable_reflection {
1542                    ctx.add_user_message(
1543                        "Before proceeding, briefly reflect on the tool results above:\n\
1544                     1. Did you get the information you needed?\n\
1545                     2. Are there any errors or unexpected results?\n\
1546                     3. What is the best next step?"
1547                            .to_string(),
1548                    );
1549                }
1550
1551                // Summarization: if threshold is set and context exceeds it, compress.
1552                // Guard on message count: inject_summary(keep_last_n=4) is a no-op
1553                // when total messages <= 5 (1 first + 4 kept), so skip the LLM call.
1554                if let Some(threshold) = self.summarize_threshold
1555                    && ctx.message_count() > 5
1556                    && ctx.needs_compaction(threshold)
1557                {
1558                    debug!(agent = %self.name, "context exceeds threshold, summarizing");
1559                    let summarize_span = info_span!(
1560                        "heartbit.agent.summarize",
1561                        agent = %self.name,
1562                        turn = ctx.current_turn(),
1563                    );
1564                    let (summary, summary_usage) =
1565                        match self.generate_summary(&ctx).instrument(summarize_span).await {
1566                            Ok(r) => r,
1567                            Err(e) => {
1568                                self.emit(AgentEvent::RunFailed {
1569                                    agent: self.name.clone(),
1570                                    error: e.to_string(),
1571                                    partial_usage: total_usage,
1572                                });
1573                                return Err((e, total_usage));
1574                            }
1575                        };
1576                    total_usage += summary_usage;
1577                    *usage_acc.lock().expect("usage lock poisoned") = total_usage;
1578                    if let Some(summary) = summary {
1579                        self.flush_to_memory_before_compaction(&ctx, 4).await;
1580                        ctx.inject_summary(summary, 4);
1581                        self.emit(AgentEvent::ContextSummarized {
1582                            agent: self.name.clone(),
1583                            turn: ctx.current_turn(),
1584                            usage: summary_usage,
1585                        });
1586                    }
1587                }
1588            }
1589        }
1590        .instrument(run_span.clone())
1591        .await;
1592
1593        // Record final metrics on the run span
1594        if mode.includes_metrics() {
1595            let usage = match &result {
1596                Ok(output) => &output.tokens_used,
1597                Err((_, usage)) => usage,
1598            };
1599            run_span.record("total_input_tokens", usage.input_tokens);
1600            run_span.record("total_output_tokens", usage.output_tokens);
1601            if let Ok(ref output) = result
1602                && let Some(cost) = output.estimated_cost_usd
1603            {
1604                run_span.record("estimated_cost_usd", cost);
1605            }
1606        }
1607
1608        result
1609    }
1610
1611    /// Generate a summary of the conversation so far using the LLM.
1612    ///
1613    /// Returns `(Option<summary_text>, token_usage)`. The summary is `None` if
1614    /// truncated (MaxTokens), in which case the caller should skip compaction.
1615    /// Token usage is always returned so the caller can accumulate it.
1616    async fn generate_summary(
1617        &self,
1618        ctx: &AgentContext,
1619    ) -> Result<(Option<String>, TokenUsage), Error> {
1620        let text = ctx.conversation_text();
1621        let lines: Vec<&str> = text.lines().collect();
1622
1623        // Use recursive summarization for long conversations (>20 lines)
1624        const CLUSTER_SIZE: usize = 10;
1625        if self.enable_recursive_summarization && lines.len() > CLUSTER_SIZE * 2 {
1626            return self.generate_recursive_summary(&lines, CLUSTER_SIZE).await;
1627        }
1628
1629        self.summarize_text(&text).await
1630    }
1631
1632    /// Single-shot summarization of a text block.
1633    async fn summarize_text(&self, text: &str) -> Result<(Option<String>, TokenUsage), Error> {
1634        let summary_request = CompletionRequest {
1635            system: "You are a summarization assistant. Summarize the following conversation \
1636                     concisely, preserving key facts, decisions, and tool results. \
1637                     Focus on information that would be needed to continue the conversation."
1638                .into(),
1639            messages: vec![Message::user(text.to_string())],
1640            tools: vec![],
1641            max_tokens: 1024,
1642            tool_choice: None,
1643            reasoning_effort: None,
1644        };
1645
1646        let response = self.provider.complete(summary_request).await?;
1647        let usage = response.usage;
1648        if response.stop_reason == StopReason::MaxTokens {
1649            tracing::warn!(
1650                agent = %self.name,
1651                "summarization truncated (max_tokens reached), skipping compaction"
1652            );
1653            return Ok((None, usage));
1654        }
1655        Ok((Some(response.text()), usage))
1656    }
1657
1658    /// Recursive summarization: chunk messages into clusters, summarize each,
1659    /// then summarize the combined cluster summaries.
1660    ///
1661    /// Preserves 3-5x more detail than single-shot for long conversations.
1662    async fn generate_recursive_summary(
1663        &self,
1664        lines: &[&str],
1665        cluster_size: usize,
1666    ) -> Result<(Option<String>, TokenUsage), Error> {
1667        let mut total_usage = TokenUsage::default();
1668        let mut cluster_summaries = Vec::new();
1669
1670        // Phase 1: Summarize each cluster
1671        for chunk in lines.chunks(cluster_size) {
1672            let cluster_text = chunk.join("\n");
1673            let (summary, usage) = self.summarize_text(&cluster_text).await?;
1674            total_usage += usage;
1675            match summary {
1676                Some(s) => cluster_summaries.push(s),
1677                None => {
1678                    // If any cluster summary is truncated, fall back to single-shot
1679                    let full_text = lines.join("\n");
1680                    let (summary, usage) = self.summarize_text(&full_text).await?;
1681                    total_usage += usage;
1682                    return Ok((summary, total_usage));
1683                }
1684            }
1685        }
1686
1687        // Phase 2: Combine cluster summaries into final summary
1688        let combined = format!(
1689            "Summarize the following section summaries into one cohesive summary:\n\n{}",
1690            cluster_summaries
1691                .iter()
1692                .enumerate()
1693                .map(|(i, s)| format!("Section {}:\n{}", i + 1, s))
1694                .collect::<Vec<_>>()
1695                .join("\n\n")
1696        );
1697        let (final_summary, combine_usage) = self.summarize_text(&combined).await?;
1698        total_usage += combine_usage;
1699        Ok((final_summary, total_usage))
1700    }
1701
1702    /// Build a `TenantScope` from the agent's audit identity fields.
1703    ///
1704    /// Falls back to single-tenant (empty `tenant_id`) when no audit context is set.
1705    fn memory_scope(&self) -> crate::auth::TenantScope {
1706        crate::auth::TenantScope::from_audit_fields(
1707            self.audit_tenant_id.as_deref(),
1708            self.audit_user_id.as_deref(),
1709        )
1710    }
1711
1712    /// Flush key tool results to memory before compaction.
1713    ///
1714    /// Extracts non-error tool results exceeding a minimum length from messages
1715    /// that are about to be compacted, storing them as episodic memories.
1716    async fn flush_to_memory_before_compaction(&self, ctx: &AgentContext, keep_last_n: usize) {
1717        let Some(ref memory) = self.memory else {
1718            return;
1719        };
1720
1721        let messages = ctx.messages_to_be_compacted(keep_last_n);
1722        let now = chrono::Utc::now();
1723
1724        for msg in messages {
1725            if msg.role != crate::llm::types::Role::User {
1726                continue;
1727            }
1728            for block in &msg.content {
1729                if let ContentBlock::ToolResult {
1730                    content, is_error, ..
1731                } = block
1732                {
1733                    // Skip errors and very short results
1734                    if *is_error || content.len() < 50 {
1735                        continue;
1736                    }
1737                    // Truncate very long results to a reasonable size
1738                    let stored_content = if content.len() > 500 {
1739                        format!(
1740                            "{}...",
1741                            &content[..crate::tool::builtins::floor_char_boundary(content, 500)]
1742                        )
1743                    } else {
1744                        content.clone()
1745                    };
1746                    let id = uuid::Uuid::new_v4().to_string();
1747                    let entry = crate::memory::MemoryEntry {
1748                        id,
1749                        agent: self.name.clone(),
1750                        content: stored_content,
1751                        category: "fact".into(),
1752                        tags: vec!["auto-flush".into()],
1753                        created_at: now,
1754                        last_accessed: now,
1755                        access_count: 0,
1756                        importance: 3,
1757                        memory_type: crate::memory::MemoryType::Episodic,
1758                        keywords: vec![],
1759                        summary: None,
1760                        strength: 0.8,
1761                        related_ids: vec![],
1762                        source_ids: vec![],
1763                        embedding: None,
1764                        confidentiality: crate::memory::Confidentiality::default(),
1765                        author_user_id: None,
1766                        author_tenant_id: None,
1767                    };
1768                    let scope = self.memory_scope();
1769                    if let Err(e) = memory.store(&scope, entry).await {
1770                        tracing::warn!(
1771                            agent = %self.name,
1772                            error = %e,
1773                            "failed to flush tool result to memory before compaction"
1774                        );
1775                    }
1776                }
1777            }
1778        }
1779    }
1780
1781    /// Prune weak memory entries at session end.
1782    ///
1783    /// Runs Ebbinghaus-based pruning with default thresholds. Errors are logged
1784    /// but do not fail the session — pruning is best-effort maintenance.
1785    async fn prune_memory_on_exit(&self) {
1786        let Some(ref memory) = self.memory else {
1787            return;
1788        };
1789        let scope = self.memory_scope();
1790        match crate::memory::pruning::prune_weak_entries(
1791            memory,
1792            &scope,
1793            crate::memory::pruning::DEFAULT_MIN_STRENGTH,
1794            crate::memory::pruning::default_min_age(),
1795        )
1796        .await
1797        {
1798            Ok(0) => {}
1799            Ok(n) => {
1800                tracing::debug!(agent = %self.name, pruned = n, "pruned weak memory entries at session end");
1801            }
1802            Err(e) => {
1803                tracing::warn!(agent = %self.name, error = %e, "memory pruning failed at session end");
1804            }
1805        }
1806    }
1807
1808    /// Run memory consolidation at session end (opt-in).
1809    ///
1810    /// Clusters related episodic memories by keyword overlap and merges them
1811    /// into semantic summaries via LLM. Returns accumulated token usage.
1812    async fn consolidate_memory_on_exit(&self) -> TokenUsage {
1813        if !self.consolidate_on_exit {
1814            return TokenUsage::default();
1815        }
1816        let Some(ref memory) = self.memory else {
1817            return TokenUsage::default();
1818        };
1819        let pipeline = crate::memory::consolidation::ConsolidationPipeline::new(
1820            memory.clone(),
1821            self.provider.clone(),
1822            &self.name,
1823        );
1824        let scope = self.memory_scope();
1825        match pipeline.run(&scope).await {
1826            Ok((0, _, usage)) => usage,
1827            Ok((clusters, entries, usage)) => {
1828                tracing::debug!(
1829                    agent = %self.name,
1830                    clusters,
1831                    entries,
1832                    "consolidated memories at session end"
1833                );
1834                usage
1835            }
1836            Err(e) => {
1837                tracing::warn!(
1838                    agent = %self.name,
1839                    error = %e,
1840                    "memory consolidation failed at session end"
1841                );
1842                TokenUsage::default()
1843            }
1844        }
1845    }
1846
1847    /// Select the most relevant tools for the current turn.
1848    ///
1849    /// Strategy:
1850    /// 1. Always include tools used in the last 2 turns (momentum)
1851    /// 2. Score remaining tools by keyword overlap with recent messages
1852    /// 3. Cap at `max_tools`
1853    pub(super) fn select_tools_for_turn(
1854        &self,
1855        all_tools: &[ToolDefinition],
1856        messages: &[Message],
1857        recently_used: &[String],
1858        max_tools: usize,
1859    ) -> Vec<ToolDefinition> {
1860        if all_tools.len() <= max_tools {
1861            return all_tools.to_vec();
1862        }
1863
1864        // Collect text from last 2 user/assistant messages for keyword matching
1865        let recent_text: String = messages
1866            .iter()
1867            .rev()
1868            .take(4)
1869            .flat_map(|m| m.content.iter())
1870            .filter_map(|block| match block {
1871                ContentBlock::Text { text } => Some(text.as_str()),
1872                _ => None,
1873            })
1874            .collect::<Vec<_>>()
1875            .join(" ")
1876            .to_lowercase();
1877
1878        let keywords: Vec<&str> = recent_text
1879            .split(|c: char| !c.is_alphanumeric() && c != '_')
1880            .filter(|w| w.len() > 2)
1881            .collect();
1882
1883        // Partition into pinned (always included) and candidates.
1884        // Pinned: recently-used tools + __respond__ (structured output must never be dropped).
1885        let mut selected: Vec<ToolDefinition> = Vec::new();
1886        let mut candidates: Vec<(ToolDefinition, usize)> = Vec::new();
1887
1888        for tool in all_tools {
1889            if recently_used.contains(&tool.name)
1890                || tool.name == crate::llm::types::RESPOND_TOOL_NAME
1891            {
1892                selected.push(tool.clone());
1893            } else {
1894                // Score by keyword overlap with tool name + description
1895                let tool_text = format!("{} {}", tool.name, tool.description).to_lowercase();
1896                let score = keywords
1897                    .iter()
1898                    .filter(|kw| tool_text.contains(**kw))
1899                    .count();
1900                candidates.push((tool.clone(), score));
1901            }
1902        }
1903
1904        // Sort candidates by score descending
1905        candidates.sort_by_key(|c| std::cmp::Reverse(c.1));
1906
1907        // Fill remaining slots (cap total at max_tools)
1908        let remaining = max_tools.saturating_sub(selected.len());
1909        selected.extend(candidates.into_iter().take(remaining).map(|(t, _)| t));
1910
1911        selected.truncate(max_tools);
1912        selected
1913    }
1914
1915    /// Compress a tool output using the LLM when it exceeds the threshold.
1916    ///
1917    /// Returns the original content if below threshold or on compression error.
1918    /// On success, returns the compressed text with a byte-count annotation.
1919    async fn compress_tool_output(
1920        &self,
1921        content: &str,
1922        threshold: usize,
1923        usage_acc: &mut TokenUsage,
1924    ) -> String {
1925        if content.len() < threshold {
1926            return content.to_string();
1927        }
1928        let original_len = content.len();
1929        let request = CompletionRequest {
1930            system: "Compress the following tool output, preserving all factual content, \
1931                     key values, and actionable information. Remove redundancy and formatting \
1932                     noise. Return ONLY the compressed content."
1933                .into(),
1934            messages: vec![Message::user(content.to_string())],
1935            tools: vec![],
1936            max_tokens: (self.max_tokens / 3).max(256),
1937            tool_choice: None,
1938            reasoning_effort: None,
1939        };
1940        match self.provider.complete(request).await {
1941            Ok(resp) => {
1942                *usage_acc += resp.usage;
1943                let compressed = resp.text();
1944                if compressed.is_empty() {
1945                    content.to_string()
1946                } else {
1947                    format!("{compressed}\n[compressed from {original_len} bytes]")
1948                }
1949            }
1950            Err(e) => {
1951                debug!(agent = %self.name, error = %e, "tool output compression failed, using original");
1952                content.to_string()
1953            }
1954        }
1955    }
1956
1957    /// Find the closest tool name match within a maximum edit distance.
1958    /// Returns the matching tool name if found within `max_distance`.
1959    pub(super) fn find_closest_tool(&self, name: &str, max_distance: usize) -> Option<&str> {
1960        self.tools
1961            .keys()
1962            .map(|k| (k.as_str(), levenshtein(name, k)))
1963            .filter(|(_, d)| *d <= max_distance && *d > 0)
1964            .min_by_key(|(_, d)| *d)
1965            .map(|(name, _)| name)
1966    }
1967
1968    /// After file-modifying tools, collect LSP diagnostics and append them
1969    /// to the corresponding tool results.
1970    async fn append_lsp_diagnostics(
1971        &self,
1972        lsp: &crate::lsp::LspManager,
1973        calls: &[ToolCall],
1974        results: &mut [ToolResult],
1975    ) {
1976        for (idx, call) in calls.iter().enumerate() {
1977            if !crate::lsp::is_file_modifying_tool(&call.name) {
1978                continue;
1979            }
1980            // Skip LSP diagnostics for failed tool calls — the file wasn't modified
1981            if idx < results.len() && results[idx].is_error {
1982                continue;
1983            }
1984            // Extract the file path from the tool input
1985            let path_str = match call
1986                .input
1987                .get("path")
1988                .or_else(|| call.input.get("file_path"))
1989            {
1990                Some(serde_json::Value::String(s)) => s.clone(),
1991                _ => continue,
1992            };
1993            let path = std::path::Path::new(&path_str);
1994            let diagnostics = lsp.notify_file_changed(path).await;
1995            if diagnostics.is_empty() {
1996                tracing::debug!(
1997                    agent = %self.name,
1998                    path = %path_str,
1999                    "lsp: no diagnostics for file"
2000                );
2001            } else {
2002                let formatted = crate::lsp::format_diagnostics(&path_str, &diagnostics);
2003                tracing::info!(
2004                    agent = %self.name,
2005                    path = %path_str,
2006                    count = diagnostics.len(),
2007                    "lsp-diagnostics appended to tool result"
2008                );
2009                if idx < results.len() {
2010                    results[idx].content.push('\n');
2011                    results[idx].content.push_str(&formatted);
2012                }
2013            }
2014        }
2015    }
2016
2017    /// Execute tools in parallel via JoinSet, returning results in original call order.
2018    ///
2019    /// Panicked tasks produce an error `ToolResult` so the LLM always gets a
2020    /// result for every `tool_use_id` it sent.
2021    async fn execute_tools_parallel(&self, calls: &[ToolCall], turn: usize) -> Vec<ToolResult> {
2022        let call_ids: Vec<String> = calls.iter().map(|c| c.id.clone()).collect();
2023        let call_names: Vec<String> = calls.iter().map(|c| c.name.clone()).collect();
2024        let mut join_set = tokio::task::JoinSet::new();
2025
2026        // Construct per-turn ExecutionContext from runner's audit fields.
2027        // Phase 0: workspace, credentials, audit_sink are not yet populated on
2028        // AgentRunner — leave them None until persona/credential plumbing lands.
2029        let exec_ctx = crate::ExecutionContext {
2030            tenant_id: self.audit_tenant_id.clone(),
2031            user_id: self.audit_user_id.clone(),
2032            workspace: None,
2033            credentials: None,
2034            audit_sink: None,
2035        };
2036
2037        for (idx, call) in calls.iter().enumerate() {
2038            // SECURITY (F-AGENT-1): names are already repaired upstream of the
2039            // permission and pre_tool guardrails. If the lookup fails here, the
2040            // name was unknown AND not Levenshtein-close to any tool — return a
2041            // "Tool not found" error and let the LLM correct itself. Repairing
2042            // at dispatch time would bypass the policy that just ran.
2043            let tool = self.tools.get(&call.name).cloned();
2044            let input = call.input.clone();
2045            let call_name = call.name.clone();
2046            let timeout = self.tool_timeout;
2047
2048            self.emit(AgentEvent::ToolCallStarted {
2049                agent: self.name.clone(),
2050                tool_name: call.name.clone(),
2051                tool_call_id: call.id.clone(),
2052                input: truncate_for_event(
2053                    &serde_json::to_string(&call.input).unwrap_or_default(),
2054                    EVENT_MAX_PAYLOAD_BYTES,
2055                ),
2056            });
2057
2058            // Audit: tool call (untruncated input)
2059            self.audit(AuditRecord {
2060                agent: self.name.clone(),
2061                turn,
2062                event_type: "tool_call".into(),
2063                payload: serde_json::json!({
2064                    "tool_name": call.name,
2065                    "tool_call_id": call.id,
2066                    "input": call.input,
2067                }),
2068                usage: TokenUsage::default(),
2069                timestamp: chrono::Utc::now(),
2070                user_id: self.audit_user_id.clone(),
2071                tenant_id: self.audit_tenant_id.clone(),
2072                delegation_chain: self.audit_delegation_chain.clone(),
2073            })
2074            .await;
2075
2076            // Validate input against the tool's declared schema before dispatching.
2077            // On failure, produce an error result without executing the tool.
2078            if let Some(ref t) = tool {
2079                let schema = &t.definition().input_schema;
2080                if let Err(msg) = validate_tool_input(schema, &input) {
2081                    join_set.spawn(async move { (idx, Ok(ToolOutput::error(msg)), 0u64) });
2082                    continue;
2083                }
2084            }
2085
2086            let tool_span = info_span!(
2087                "heartbit.agent.tool_call",
2088                agent = %self.name,
2089                tool_name = %call.name,
2090            );
2091            let task_ctx = exec_ctx.clone();
2092            join_set.spawn(
2093                async move {
2094                    let start = std::time::Instant::now();
2095                    let output = match tool {
2096                        Some(t) => match timeout {
2097                            Some(dur) => {
2098                                match tokio::time::timeout(dur, t.execute(&task_ctx, input)).await {
2099                                    Ok(result) => result,
2100                                    Err(_) => Ok(ToolOutput::error(format!(
2101                                        "Tool execution timed out after {}s",
2102                                        dur.as_secs_f64()
2103                                    ))),
2104                                }
2105                            }
2106                            None => t.execute(&task_ctx, input).await,
2107                        },
2108                        None => Ok(ToolOutput::error(format!("Tool not found: {call_name}"))),
2109                    };
2110                    let duration_ms = start.elapsed().as_millis() as u64;
2111                    (idx, output, duration_ms)
2112                }
2113                .instrument(tool_span),
2114            );
2115        }
2116
2117        // Collect (idx, output, duration) tuples from JoinSet
2118        let mut outputs: Vec<Option<(ToolOutput, u64)>> = vec![None; calls.len()];
2119        while let Some(result) = join_set.join_next().await {
2120            match result {
2121                Ok((idx, Ok(output), duration_ms)) => {
2122                    let output = match self.max_tool_output_bytes {
2123                        Some(max) => output.truncated(max),
2124                        None => output,
2125                    };
2126                    outputs[idx] = Some((output, duration_ms));
2127                }
2128                Ok((idx, Err(e), duration_ms)) => {
2129                    outputs[idx] = Some((ToolOutput::error(e.to_string()), duration_ms));
2130                }
2131                Err(join_err) => {
2132                    tracing::error!(error = %join_err, "tool task panicked");
2133                }
2134            }
2135        }
2136
2137        // Apply post_tool guardrails and convert to ToolResult
2138        let mut results_vec = Vec::with_capacity(calls.len());
2139        for (idx, slot) in outputs.into_iter().enumerate() {
2140            let (mut output, duration_ms) = slot
2141                .unwrap_or_else(|| (ToolOutput::error("Tool execution panicked".to_string()), 0));
2142
2143            // post_tool guardrail: each guardrail can mutate the output
2144            for g in &self.guardrails {
2145                if let Err(e) = g.post_tool(&calls[idx], &mut output).await {
2146                    self.emit(AgentEvent::GuardrailDenied {
2147                        agent: self.name.clone(),
2148                        hook: "post_tool".into(),
2149                        reason: e.to_string(),
2150                        tool_name: Some(call_names[idx].clone()),
2151                    });
2152                    // Audit: post_tool guardrail denied
2153                    self.audit(AuditRecord {
2154                        agent: self.name.clone(),
2155                        turn,
2156                        event_type: "guardrail_denied".into(),
2157                        payload: serde_json::json!({
2158                            "hook": "post_tool",
2159                            "reason": e.to_string(),
2160                            "tool_name": call_names[idx],
2161                        }),
2162                        usage: TokenUsage::default(),
2163                        timestamp: chrono::Utc::now(),
2164                        // SECURITY (F-AGENT-5): attribute the deny to the
2165                        // identity the rest of the run is attributed to. All
2166                        // other AuditRecord sites in this file pass these
2167                        // fields; this one used to set them to None, leaving
2168                        // post_tool denials unattributable cross-tenant.
2169                        user_id: self.audit_user_id.clone(),
2170                        tenant_id: self.audit_tenant_id.clone(),
2171                        delegation_chain: self.audit_delegation_chain.clone(),
2172                    })
2173                    .await;
2174                    // post_tool error: convert to error output instead of aborting
2175                    // the entire run (consistent with tool execution errors)
2176                    output = ToolOutput::error(format!("Guardrail error: {e}"));
2177                    break;
2178                }
2179            }
2180
2181            let is_error = output.is_error;
2182            self.emit(AgentEvent::ToolCallCompleted {
2183                agent: self.name.clone(),
2184                tool_name: call_names[idx].clone(),
2185                tool_call_id: call_ids[idx].clone(),
2186                is_error,
2187                duration_ms,
2188                output: truncate_for_event(&output.content, EVENT_MAX_PAYLOAD_BYTES),
2189            });
2190            // Audit: tool result (untruncated output)
2191            self.audit(AuditRecord {
2192                agent: self.name.clone(),
2193                turn,
2194                event_type: "tool_result".into(),
2195                payload: serde_json::json!({
2196                    "tool_name": call_names[idx],
2197                    "tool_call_id": call_ids[idx],
2198                    "output": output.content,
2199                    "is_error": is_error,
2200                    "duration_ms": duration_ms,
2201                }),
2202                usage: TokenUsage::default(),
2203                timestamp: chrono::Utc::now(),
2204                user_id: self.audit_user_id.clone(),
2205                tenant_id: self.audit_tenant_id.clone(),
2206                delegation_chain: self.audit_delegation_chain.clone(),
2207            })
2208            .await;
2209            results_vec.push(tool_output_to_result(call_ids[idx].clone(), output));
2210        }
2211
2212        results_vec
2213    }
2214}
2215
2216impl<P: LlmProvider> Drop for AgentRunner<P> {
2217    fn drop(&mut self) {
2218        if let (Some(tracker), Some(tid)) =
2219            (self.tenant_tracker.as_ref(), self.audit_tenant_id.as_ref())
2220        {
2221            let actual = self
2222                .cumulative_actual_tokens
2223                .load(std::sync::atomic::Ordering::SeqCst) as i64;
2224            if actual > 0 {
2225                let scope = crate::auth::TenantScope::new(tid.clone());
2226                tracker.adjust(&scope, -actual);
2227            }
2228        }
2229    }
2230}
2231
2232pub(super) fn tool_output_to_result(tool_use_id: String, output: ToolOutput) -> ToolResult {
2233    if output.is_error {
2234        ToolResult::error(tool_use_id, output.content)
2235    } else {
2236        ToolResult::success(tool_use_id, output.content)
2237    }
2238}
2239
2240#[cfg(test)]
2241mod tests {
2242    use std::pin::Pin;
2243    use std::sync::Arc;
2244
2245    use crate::agent::tenant_tracker::TenantTokenTracker;
2246    use crate::auth::TenantScope;
2247    use crate::error::Error;
2248    use crate::llm::types::{
2249        CompletionResponse, ContentBlock, StopReason, TokenUsage, ToolDefinition,
2250    };
2251    use crate::tool::{Tool, ToolOutput};
2252
2253    use super::super::test_helpers::MockProvider;
2254    use super::AgentRunner;
2255
2256    /// Trivial no-op tool so the runner can dispatch a tool_use response.
2257    struct NoopTool;
2258
2259    impl Tool for NoopTool {
2260        fn definition(&self) -> ToolDefinition {
2261            ToolDefinition {
2262                name: "noop".into(),
2263                description: "Does nothing.".into(),
2264                input_schema: serde_json::json!({"type": "object", "properties": {}}),
2265            }
2266        }
2267
2268        fn execute(
2269            &self,
2270            _ctx: &crate::ExecutionContext,
2271            _input: serde_json::Value,
2272        ) -> Pin<Box<dyn std::future::Future<Output = Result<ToolOutput, Error>> + Send + '_>>
2273        {
2274            Box::pin(async { Ok(ToolOutput::success("ok".to_string())) })
2275        }
2276    }
2277
2278    /// Build a tool-use response so the runner loops back for a second LLM call.
2279    fn tool_use_response(input_tokens: u32, output_tokens: u32) -> CompletionResponse {
2280        CompletionResponse {
2281            content: vec![ContentBlock::ToolUse {
2282                id: "call-1".into(),
2283                name: "noop".into(),
2284                input: serde_json::json!({}),
2285            }],
2286            stop_reason: StopReason::ToolUse,
2287            usage: TokenUsage {
2288                input_tokens,
2289                output_tokens,
2290                ..Default::default()
2291            },
2292            model: None,
2293        }
2294    }
2295
2296    #[tokio::test(flavor = "multi_thread")]
2297    async fn agent_runner_adjusts_tenant_tracker_per_turn() {
2298        let tracker = Arc::new(TenantTokenTracker::new(1_000_000));
2299        let scope = TenantScope::new("acme");
2300        // Simulate the daemon's submit-time admission check (Task 7) — drop
2301        // the reservation immediately, matching admission-only semantics.
2302        drop(tracker.reserve(&scope, 5000).unwrap());
2303        assert_eq!(tracker.snapshot()[0].1.in_flight, 0);
2304
2305        // Build a mock provider that returns known TokenUsage in one turn.
2306        let provider = Arc::new(MockProvider::new(vec![MockProvider::text_response(
2307            "done", 100, 200,
2308        )]));
2309
2310        let runner = AgentRunner::builder(provider)
2311            .name("test")
2312            .system_prompt("test")
2313            .audit_user_context("test-user", "acme")
2314            .tenant_tracker(tracker.clone())
2315            .max_turns(1)
2316            .build()
2317            .unwrap();
2318        let _output = runner.execute("hello").await.unwrap();
2319
2320        // After one turn: cumulative_actual_tokens = 300, so adjust(+300).
2321        let snap = tracker.snapshot();
2322        assert_eq!(snap[0].1.in_flight, 300);
2323
2324        // After runner Drop: in_flight returns to 0.
2325        drop(runner);
2326        let snap = tracker.snapshot();
2327        assert_eq!(snap[0].1.in_flight, 0);
2328    }
2329
2330    #[tokio::test(flavor = "multi_thread")]
2331    async fn agent_runner_adjusts_tracker_cumulatively_across_turns() {
2332        // Two-turn test: verifies cumulative semantics (not per-turn deltas).
2333        // Turn 1: tool_use response (300 tokens) → runner loops.
2334        // Turn 2: text response (200 tokens) → runner stops.
2335        // Expected: in_flight = 500 (cumulative), zeroed on Drop.
2336        let tracker = Arc::new(TenantTokenTracker::new(1_000_000));
2337        let scope = TenantScope::new("acme");
2338        drop(tracker.reserve(&scope, 5000).unwrap());
2339
2340        let provider = Arc::new(MockProvider::new(vec![
2341            tool_use_response(100, 200), // turn 1: +300 → 300 cumulative
2342            MockProvider::text_response("done", 50, 150), // turn 2: +200 → 500 cumulative
2343        ]));
2344
2345        let runner = AgentRunner::builder(provider)
2346            .name("test")
2347            .system_prompt("test")
2348            .audit_user_context("test-user", "acme")
2349            .tenant_tracker(tracker.clone())
2350            .max_turns(2)
2351            .tool(Arc::new(NoopTool))
2352            .build()
2353            .unwrap();
2354        let _output = runner.execute("hello").await.unwrap();
2355
2356        // After two turns: cumulative = 300 + 200 = 500.
2357        let snap = tracker.snapshot();
2358        assert_eq!(snap[0].1.in_flight, 500);
2359
2360        drop(runner);
2361        assert_eq!(tracker.snapshot()[0].1.in_flight, 0);
2362    }
2363
2364    #[tokio::test]
2365    async fn execution_context_propagates_to_tool() {
2366        use std::sync::Mutex;
2367
2368        use crate::ExecutionContext;
2369        use crate::llm::types::ToolCall;
2370
2371        struct CtxCapturingTool {
2372            captured_tenant: Arc<Mutex<Option<String>>>,
2373        }
2374
2375        impl Tool for CtxCapturingTool {
2376            fn definition(&self) -> ToolDefinition {
2377                ToolDefinition {
2378                    name: "ctx_capture".into(),
2379                    description: "Captures the tenant_id from ExecutionContext.".into(),
2380                    input_schema: serde_json::json!({"type": "object"}),
2381                }
2382            }
2383
2384            fn execute(
2385                &self,
2386                ctx: &ExecutionContext,
2387                _input: serde_json::Value,
2388            ) -> Pin<Box<dyn std::future::Future<Output = Result<ToolOutput, Error>> + Send + '_>>
2389            {
2390                let captured = self.captured_tenant.clone();
2391                let tenant = ctx.tenant_id.clone();
2392                Box::pin(async move {
2393                    *captured.lock().unwrap() = tenant;
2394                    Ok(ToolOutput::success("ok"))
2395                })
2396            }
2397        }
2398
2399        let captured = Arc::new(Mutex::new(None));
2400        let tool = Arc::new(CtxCapturingTool {
2401            captured_tenant: captured.clone(),
2402        });
2403
2404        let provider = Arc::new(MockProvider::new(vec![]));
2405        let runner = AgentRunner::builder(provider)
2406            .name("test")
2407            .system_prompt("test")
2408            .max_turns(1)
2409            .tools(vec![tool as Arc<dyn Tool>])
2410            .audit_user_context("test-user", "test-tenant")
2411            .build()
2412            .unwrap();
2413
2414        let calls = vec![ToolCall {
2415            id: "c1".into(),
2416            name: "ctx_capture".into(),
2417            input: serde_json::json!({}),
2418        }];
2419        let _results = runner.execute_tools_parallel(&calls, 0).await;
2420
2421        assert_eq!(
2422            captured.lock().unwrap().as_deref(),
2423            Some("test-tenant"),
2424            "tool did not receive the tenant_id from ExecutionContext"
2425        );
2426    }
2427}