1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use serde::{Deserialize, Serialize};
6use tracing::{Instrument, debug, info_span};
7
8use crate::error::Error;
9use crate::llm::LlmProvider;
10use crate::llm::types::{
11 CompletionRequest, ContentBlock, Message, StopReason, TokenUsage, ToolCall, ToolDefinition,
12 ToolResult,
13};
14use crate::memory::Memory;
15use crate::tool::{Tool, ToolOutput, validate_tool_input};
16use crate::util::levenshtein;
17
18use super::audit::{AuditRecord, AuditTrail};
19use super::builder::AgentRunnerBuilder;
20use super::cache;
21use super::context::{AgentContext, ContextStrategy};
22use super::doom_loop::DoomLoopTracker;
23use super::events::{AgentEvent, EVENT_MAX_PAYLOAD_BYTES, OnEvent, truncate_for_event};
24use super::guardrail::{GuardAction, Guardrail};
25use super::observability;
26use super::permission;
27use super::pruner;
28use super::tool_filter;
29
30pub type OnInput = dyn Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<String>> + Send>>
34 + Send
35 + Sync;
36
37pub(crate) const RESOURCEFULNESS_GUIDELINES: &str = "\n\n\
41## Resourcefulness\n\
42Before claiming you cannot do something or lack access to a tool:\n\
43- Use bash to check for installed CLIs (`which <tool>`, `command -v <tool>`).\n\
44- Search for files, configs, and resources before saying they don't exist.\n\
45- Read documentation, help output (`<tool> --help`), and man pages when unsure.\n\
46- Try alternative approaches when the first attempt fails.\n\
47Never say \"I don't have access\" or \"I can't\" without evidence. Investigate first.";
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct AgentOutput {
55 pub result: String,
57 pub tool_calls_made: usize,
59 pub tokens_used: TokenUsage,
61 pub structured: Option<serde_json::Value>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
67 pub estimated_cost_usd: Option<f64>,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub model_name: Option<String>,
72}
73
74impl AgentOutput {
75 pub(crate) fn accumulate_into(
77 &self,
78 total_usage: &mut TokenUsage,
79 total_tool_calls: &mut usize,
80 total_cost: &mut Option<f64>,
81 ) {
82 *total_usage += self.tokens_used;
83 *total_tool_calls += self.tool_calls_made;
84 if let Some(cost) = self.estimated_cost_usd {
85 *total_cost.get_or_insert(0.0) += cost;
86 }
87 }
88}
89
90pub struct AgentRunner<P: LlmProvider> {
92 pub(super) provider: Arc<P>,
93 pub(super) name: String,
94 pub(super) system_prompt: String,
95 pub(super) tools: HashMap<String, Arc<dyn Tool>>,
96 pub(super) tool_defs: Vec<ToolDefinition>,
97 pub(super) max_turns: usize,
98 pub(super) max_tokens: u32,
99 pub(super) context_strategy: ContextStrategy,
100 pub(super) summarize_threshold: Option<u32>,
102 pub(super) on_text: Option<Arc<crate::llm::OnText>>,
104 pub(super) on_approval: Option<Arc<crate::llm::OnApproval>>,
106 pub(super) tool_timeout: Option<Duration>,
108 pub(super) max_tool_output_bytes: Option<usize>,
111 pub(super) structured_schema: Option<serde_json::Value>,
114 pub(super) on_event: Option<Arc<OnEvent>>,
116 pub(super) guardrails: Vec<Arc<dyn Guardrail>>,
118 pub(super) on_input: Option<Arc<OnInput>>,
122 pub(super) run_timeout: Option<Duration>,
125 pub(super) reasoning_effort: Option<crate::llm::types::ReasoningEffort>,
127 pub(super) enable_reflection: bool,
130 pub(super) tool_output_compression_threshold: Option<usize>,
133 pub(super) max_tools_per_turn: Option<usize>,
136 pub(super) tool_profile: Option<tool_filter::ToolProfile>,
139 pub(super) max_identical_tool_calls: Option<u32>,
143 pub(super) max_fuzzy_identical_tool_calls: Option<u32>,
147 pub(super) max_tool_calls_per_turn: Option<u32>,
151 pub(super) permission_rules: parking_lot::RwLock<permission::PermissionRuleset>,
159 pub(super) learned_permissions: Option<Arc<std::sync::Mutex<permission::LearnedPermissions>>>,
161 pub(super) lsp_manager: Option<Arc<crate::lsp::LspManager>>,
163 pub(super) session_prune_config: Option<pruner::SessionPruneConfig>,
166 pub(super) memory: Option<Arc<dyn Memory>>,
168 pub(super) enable_recursive_summarization: bool,
171 pub(super) consolidate_on_exit: bool,
173 pub(super) observability_mode: observability::ObservabilityMode,
175 pub(super) max_total_tokens: Option<u64>,
178 pub(super) audit_mode: super::audit::AuditMode,
180 pub(super) audit_trail: Option<Arc<dyn AuditTrail>>,
182 pub(super) audit_user_id: Option<String>,
184 pub(super) audit_tenant_id: Option<String>,
185 pub(super) audit_delegation_chain: Vec<String>,
187 pub(super) response_cache: Option<cache::ResponseCache>,
190 pub(super) tenant_tracker: Option<Arc<crate::agent::tenant_tracker::TenantTokenTracker>>,
193 pub(super) cumulative_actual_tokens: std::sync::atomic::AtomicUsize,
197}
198
199impl<P: LlmProvider> AgentRunner<P> {
200 pub fn builder(provider: Arc<P>) -> AgentRunnerBuilder<P> {
223 AgentRunnerBuilder {
224 provider,
225 name: "agent".into(),
226 system_prompt: String::new(),
227 tools: Vec::new(),
228 max_turns: 10,
229 max_tokens: 4096,
230 context_strategy: None,
231 summarize_threshold: None,
232 memory: None,
233 knowledge_base: None,
234 on_text: None,
235 on_approval: None,
236 tool_timeout: None,
237 max_tool_output_bytes: None,
238 structured_schema: None,
239 on_event: None,
240 guardrails: Vec::new(),
241 on_question: None,
242 on_input: None,
243 run_timeout: None,
244 reasoning_effort: None,
245 enable_reflection: false,
246 tool_output_compression_threshold: None,
247 max_tools_per_turn: None,
248 tool_profile: None,
249 max_identical_tool_calls: None,
250 max_fuzzy_identical_tool_calls: None,
251 max_tool_calls_per_turn: None,
252 permission_rules: permission::PermissionRuleset::default(),
253 instruction_text: None,
254 learned_permissions: None,
255 lsp_manager: None,
256 session_prune_config: None,
257 enable_recursive_summarization: false,
258 reflection_threshold: None,
259 consolidate_on_exit: false,
260 observability_mode: None,
261 workspace: None,
262 max_total_tokens: None,
263 audit_mode: super::audit::AuditMode::Full,
264 audit_trail: None,
265 audit_user_id: None,
266 audit_tenant_id: None,
267 audit_delegation_chain: Vec::new(),
268 response_cache_size: None,
269 tenant_tracker: None,
270 }
271 }
272
273 pub fn name(&self) -> &str {
275 &self.name
276 }
277
278 fn eval_permission(
280 &self,
281 tool_name: &str,
282 input: &serde_json::Value,
283 ) -> Option<permission::PermissionAction> {
284 self.permission_rules.read().evaluate(tool_name, input)
285 }
286
287 fn has_permission_rules(&self) -> bool {
289 !self.permission_rules.read().is_empty()
290 }
291
292 fn emit(&self, event: AgentEvent) {
293 if let Some(ref cb) = self.on_event {
294 cb(event);
295 }
296 }
297
298 async fn audit(&self, mut record: AuditRecord) {
300 if let Some(ref trail) = self.audit_trail {
301 if self.audit_mode == super::audit::AuditMode::MetadataOnly {
302 let payload = std::mem::take(&mut record.payload);
305 record.payload = super::audit::strip_content_owned(payload);
306 }
307 if let Err(e) = trail.record(record).await {
308 tracing::warn!(error = %e, "audit record failed");
309 }
310 }
311 }
312
313 fn persist_approval_decision(
319 &self,
320 tool_calls: &[ToolCall],
321 decision: crate::llm::ApprovalDecision,
322 ) {
323 let action = if decision.is_allowed() {
324 permission::PermissionAction::Allow
325 } else {
326 permission::PermissionAction::Deny
327 };
328 let mut seen = std::collections::HashSet::new();
330 let mut new_rules = Vec::new();
331 for tc in tool_calls {
332 if seen.insert(tc.name.clone()) {
333 new_rules.push(permission::PermissionRule {
334 tool: tc.name.clone(),
335 pattern: "*".into(),
336 action,
337 });
338 }
339 }
340 self.permission_rules.write().append_rules(&new_rules);
343 if let Some(ref learned) = self.learned_permissions {
345 for rule in new_rules {
346 if let Ok(mut guard) = learned.lock()
347 && let Err(e) = guard.add_rule(rule)
348 {
349 tracing::warn!(
350 error = %e,
351 "failed to persist learned permission rule"
352 );
353 }
354 }
355 }
356 }
357
358 fn estimate_cost(&self, usage: &TokenUsage) -> Option<f64> {
360 self.provider
361 .model_name()
362 .and_then(|model| crate::llm::pricing::estimate_cost(model, usage))
363 }
364
365 pub async fn execute(&self, task: &str) -> Result<AgentOutput, Error> {
367 let ctx = AgentContext::new(&self.system_prompt, task, self.tool_defs.clone())
368 .with_max_turns(self.max_turns)
369 .with_max_tokens(self.max_tokens)
370 .with_context_strategy(self.context_strategy.clone())
371 .with_reasoning_effort(self.reasoning_effort);
372 self.execute_with_context(ctx, task).await
373 }
374
375 pub async fn execute_with_content(
377 &self,
378 content: Vec<ContentBlock>,
379 ) -> Result<AgentOutput, Error> {
380 let task_summary: String = content
382 .iter()
383 .filter_map(|b| match b {
384 ContentBlock::Text { text } => Some(text.as_str()),
385 _ => None,
386 })
387 .collect::<Vec<_>>()
388 .join(" ");
389
390 let ctx = AgentContext::from_content(&self.system_prompt, content, self.tool_defs.clone())
391 .with_max_turns(self.max_turns)
392 .with_max_tokens(self.max_tokens)
393 .with_context_strategy(self.context_strategy.clone())
394 .with_reasoning_effort(self.reasoning_effort);
395 self.execute_with_context(ctx, &task_summary).await
396 }
397
398 async fn execute_with_context(
399 &self,
400 ctx: AgentContext,
401 task_description: &str,
402 ) -> Result<AgentOutput, Error> {
403 let usage_acc = Arc::new(std::sync::Mutex::new(TokenUsage::default()));
406 let fut = {
407 let acc = usage_acc.clone();
408 async move {
409 match self.execute_inner(ctx, task_description, acc).await {
410 Ok(output) => Ok(output),
411 Err((e, usage)) => Err(e.with_partial_usage(usage)),
412 }
413 }
414 };
415 let mut result = match self.run_timeout {
416 Some(timeout) => match tokio::time::timeout(timeout, fut).await {
417 Ok(result) => result,
418 Err(_) => {
419 let usage = *usage_acc.lock().expect("usage lock poisoned");
420 Err(Error::RunTimeout(timeout).with_partial_usage(usage))
421 }
422 },
423 None => fut.await,
424 };
425
426 if let Err(ref e) = result {
428 self.audit(AuditRecord {
429 agent: self.name.clone(),
430 turn: 0,
431 event_type: "run_failed".into(),
432 payload: serde_json::json!({
433 "error": e.to_string(),
434 }),
435 usage: e.partial_usage(),
436 timestamp: chrono::Utc::now(),
437 user_id: self.audit_user_id.clone(),
438 tenant_id: self.audit_tenant_id.clone(),
439 delegation_chain: self.audit_delegation_chain.clone(),
440 })
441 .await;
442 }
443
444 if let Ok(ref mut output) = result {
446 let consolidation_usage = self.consolidate_memory_on_exit().await;
448 if consolidation_usage.input_tokens > 0 || consolidation_usage.output_tokens > 0 {
449 output.tokens_used += consolidation_usage;
450 if let Some(consolidation_cost) = self.estimate_cost(&consolidation_usage) {
453 output.estimated_cost_usd =
454 Some(output.estimated_cost_usd.unwrap_or(0.0) + consolidation_cost);
455 }
456 }
457
458 self.prune_memory_on_exit().await;
460 }
461
462 result
463 }
464
465 async fn execute_inner(
466 &self,
467 initial_ctx: AgentContext,
468 task: &str,
469 usage_acc: Arc<std::sync::Mutex<TokenUsage>>,
470 ) -> Result<AgentOutput, (Error, TokenUsage)> {
471 let mode = self.observability_mode;
472 let run_span = info_span!(
473 "heartbit.agent.run",
474 agent = %self.name,
475 max_turns = self.max_turns,
476 task = tracing::field::Empty,
477 model = tracing::field::Empty,
478 total_input_tokens = tracing::field::Empty,
479 total_output_tokens = tracing::field::Empty,
480 estimated_cost_usd = tracing::field::Empty,
481 );
482 if mode.includes_metrics()
483 && let Some(model) = self.provider.model_name()
484 {
485 run_span.record("model", model);
486 }
487 if mode.includes_payloads() {
488 run_span.record(
489 "task",
490 truncate_for_event(task, EVENT_MAX_PAYLOAD_BYTES).as_str(),
491 );
492 } else if mode.includes_metrics() {
493 let cut = crate::tool::builtins::floor_char_boundary(task, 256);
494 run_span.record("task", &task[..cut]);
495 }
496
497 let result = async {
498 self.emit(AgentEvent::RunStarted {
499 agent: self.name.clone(),
500 task: task.to_string(),
501 });
502
503 let mut ctx = initial_ctx;
504
505 let mut total_tool_calls = 0usize;
506 let mut total_usage = TokenUsage::default();
507 let mut total_cost: f64 = 0.0;
509 let mut recently_used_tools: Vec<String> = Vec::new();
511 let mut doom_tracker = DoomLoopTracker::new();
512 let mut last_model_name: Option<String> = None;
513 let mut compacted_last_turn = false;
516
517 loop {
518 if ctx.current_turn() >= ctx.max_turns() {
519 self.emit(AgentEvent::RunFailed {
520 agent: self.name.clone(),
521 error: format!("Max turns ({}) exceeded", ctx.max_turns()),
522 partial_usage: total_usage,
523 });
524 return Err((Error::MaxTurnsExceeded(ctx.max_turns()), total_usage));
525 }
526
527 ctx.increment_turn();
528 let can_compact = !compacted_last_turn;
529 compacted_last_turn = false;
530 debug!(agent = %self.name, turn = ctx.current_turn(), "executing turn");
531 self.emit(AgentEvent::TurnStarted {
532 agent: self.name.clone(),
533 turn: ctx.current_turn(),
534 max_turns: ctx.max_turns(),
535 });
536
537 for g in &self.guardrails {
539 g.set_turn(ctx.current_turn());
540 }
541
542 let mut request = if let Some(ref prune_config) = self.session_prune_config {
544 let mut req = ctx.to_request();
545 let (pruned_msgs, prune_stats) =
546 pruner::prune_old_tool_results(&req.messages, prune_config);
547 req.messages = pruned_msgs;
548 if prune_stats.did_prune() {
549 debug!(
550 agent = %self.name,
551 turn = ctx.current_turn(),
552 pruned = prune_stats.tool_results_pruned,
553 total = prune_stats.tool_results_total,
554 bytes_saved = prune_stats.bytes_saved,
555 "session pruning applied"
556 );
557 self.emit(AgentEvent::SessionPruned {
558 agent: self.name.clone(),
559 turn: ctx.current_turn(),
560 tool_results_pruned: prune_stats.tool_results_pruned,
561 bytes_saved: prune_stats.bytes_saved,
562 tool_results_total: prune_stats.tool_results_total,
563 });
564 }
565 req
566 } else {
567 ctx.to_request()
568 };
569
570 if let Some(profile) = self.tool_profile {
572 request.tools = tool_filter::filter_tools(&request.tools, profile);
573 }
574
575 if let Some(max_tools) = self.max_tools_per_turn {
577 request.tools = self.select_tools_for_turn(
578 &request.tools,
579 &request.messages,
580 &recently_used_tools,
581 max_tools,
582 );
583 }
584
585 for g in &self.guardrails {
586 if let Err(e) = g.pre_llm(&mut request).await {
587 self.emit(AgentEvent::RunFailed {
588 agent: self.name.clone(),
589 error: e.to_string(),
590 partial_usage: total_usage,
591 });
592 return Err((e, total_usage));
593 }
594 }
595 let cache_key = if self.response_cache.is_some() && self.on_text.is_none() {
601 let tool_names: Vec<&str> =
602 request.tools.iter().map(|t| t.name.as_str()).collect();
603 let namespace = match (&self.audit_tenant_id, &self.audit_user_id) {
604 (Some(t), Some(u)) => Some(format!("{t}:{u}")),
605 (Some(t), None) => Some(t.clone()),
606 (None, Some(u)) => Some(format!(":{u}")),
607 (None, None) => None,
608 };
609 Some(cache::ResponseCache::compute_key_scoped(
610 &request.system,
611 &request.messages,
612 &tool_names,
613 namespace.as_deref(),
614 ))
615 } else {
616 None
617 };
618 let cache_hit = cache_key
620 .and_then(|k| self.response_cache.as_ref().and_then(|c| c.get(k)));
621 let llm_start = Instant::now();
622 let llm_span = info_span!(
623 "heartbit.agent.llm_call",
624 agent = %self.name,
625 turn = ctx.current_turn(),
626 { observability::GEN_AI_REQUEST_MODEL } = tracing::field::Empty,
627 latency_ms = tracing::field::Empty,
628 { observability::GEN_AI_USAGE_INPUT_TOKENS } = tracing::field::Empty,
629 { observability::GEN_AI_USAGE_OUTPUT_TOKENS } = tracing::field::Empty,
630 { observability::GEN_AI_RESPONSE_FINISH_REASON } = tracing::field::Empty,
631 tool_call_count = tracing::field::Empty,
632 ttft_ms = tracing::field::Empty,
633 response_text = tracing::field::Empty,
634 cache_hit = tracing::field::Empty,
635 );
636 let llm_result = if let Some(cached) = cache_hit {
637 tracing::debug!(
638 agent = %self.name,
639 turn = ctx.current_turn(),
640 "response cache hit, skipping LLM call"
641 );
642 if mode.includes_metrics() {
643 llm_span.record("cache_hit", true);
644 }
645 Ok(cached)
646 } else {
647 let ttft_ms_inner = Arc::new(std::sync::atomic::AtomicU64::new(0));
649 let ttft_ref = ttft_ms_inner.clone();
650 let result = async {
651 match &self.on_text {
652 Some(cb) => {
653 let ttft_ref = ttft_ref.clone();
654 let start = llm_start;
655 let inner_cb = cb.clone();
656 let wrapper: Box<crate::llm::OnText> =
657 Box::new(move |text: &str| {
658 ttft_ref
659 .compare_exchange(
660 0,
661 start.elapsed().as_millis() as u64,
662 std::sync::atomic::Ordering::Relaxed,
663 std::sync::atomic::Ordering::Relaxed,
664 )
665 .ok();
666 inner_cb(text);
667 });
668 self.provider.stream_complete(request, &*wrapper).await
669 }
670 None => self.provider.complete(request).await,
671 }
672 }
673 .instrument(llm_span.clone())
674 .await;
675 if let (Ok(resp), Some(key)) = (&result, cache_key)
679 && resp.stop_reason == crate::llm::types::StopReason::EndTurn
680 && let Some(ref c) = self.response_cache
681 {
682 c.put(key, resp.clone());
683 }
684 if mode.includes_metrics() {
685 let ttft = ttft_ms_inner.load(std::sync::atomic::Ordering::Relaxed);
686 llm_span.record("ttft_ms", ttft);
687 llm_span.record("cache_hit", false);
688 }
689 result
690 };
691 let llm_latency_ms = llm_start.elapsed().as_millis() as u64;
692 if mode.includes_metrics() {
694 llm_span.record("latency_ms", llm_latency_ms);
695 if let Ok(ref r) = llm_result {
696 if let Some(ref model) = r.model {
697 llm_span.record(observability::GEN_AI_REQUEST_MODEL, model.as_str());
698 } else if let Some(model) = self.provider.model_name() {
699 llm_span.record(observability::GEN_AI_REQUEST_MODEL, model);
700 }
701 } else if let Some(model) = self.provider.model_name() {
702 llm_span.record(observability::GEN_AI_REQUEST_MODEL, model);
703 }
704 if let Ok(ref r) = llm_result {
705 llm_span.record(
706 observability::GEN_AI_USAGE_INPUT_TOKENS,
707 r.usage.input_tokens,
708 );
709 llm_span.record(
710 observability::GEN_AI_USAGE_OUTPUT_TOKENS,
711 r.usage.output_tokens,
712 );
713 llm_span.record(
714 observability::GEN_AI_RESPONSE_FINISH_REASON,
715 format!("{:?}", r.stop_reason).as_str(),
716 );
717 llm_span.record("tool_call_count", r.tool_calls().len());
718 }
719 }
720 if mode.includes_payloads()
721 && let Ok(ref r) = llm_result
722 {
723 llm_span.record(
724 "response_text",
725 truncate_for_event(&r.text(), EVENT_MAX_PAYLOAD_BYTES).as_str(),
726 );
727 }
728 let mut response = match llm_result {
729 Ok(r) => r,
730 Err(e) => {
731 if crate::llm::error_class::classify(&e)
733 == crate::llm::error_class::ErrorClass::ContextOverflow
734 && can_compact
735 && ctx.message_count() > 5
736 {
737 tracing::warn!(
738 agent = %self.name,
739 error = %e,
740 "context overflow detected, attempting auto-compaction"
741 );
742 match self.generate_summary(&ctx).await {
743 Ok((Some(summary), summary_usage)) => {
744 total_usage += summary_usage;
745 if let Some(c) = self.estimate_cost(&summary_usage) {
746 total_cost += c;
747 }
748 *usage_acc.lock().expect("usage lock poisoned") = total_usage;
749 self.flush_to_memory_before_compaction(&ctx, 4).await;
750 ctx.inject_summary(summary, 4);
751 self.emit(AgentEvent::AutoCompactionTriggered {
752 agent: self.name.clone(),
753 turn: ctx.current_turn(),
754 success: true,
755 usage: summary_usage,
756 });
757 self.emit(AgentEvent::ContextSummarized {
758 agent: self.name.clone(),
759 turn: ctx.current_turn(),
760 usage: summary_usage,
761 });
762 compacted_last_turn = true;
763 continue;
764 }
765 Ok((None, summary_usage)) => {
766 total_usage += summary_usage;
767 *usage_acc.lock().expect("usage lock poisoned") = total_usage;
768 self.emit(AgentEvent::AutoCompactionTriggered {
769 agent: self.name.clone(),
770 turn: ctx.current_turn(),
771 success: false,
772 usage: summary_usage,
773 });
774 tracing::warn!(
775 agent = %self.name,
776 "auto-compaction summary was truncated, cannot compact"
777 );
778 }
779 Err(summary_err) => {
780 self.emit(AgentEvent::AutoCompactionTriggered {
781 agent: self.name.clone(),
782 turn: ctx.current_turn(),
783 success: false,
784 usage: TokenUsage::default(),
785 });
786 tracing::warn!(
787 agent = %self.name,
788 error = %summary_err,
789 "auto-compaction summary failed"
790 );
791 }
792 }
793 }
794 self.emit(AgentEvent::RunFailed {
795 agent: self.name.clone(),
796 error: e.to_string(),
797 partial_usage: total_usage,
798 });
799 return Err((e, total_usage));
800 }
801 };
802 total_usage += response.usage;
803
804 if let (Some(tracker), Some(tid)) =
808 (&self.tenant_tracker, &self.audit_tenant_id)
809 {
810 let actual =
811 (total_usage.input_tokens + total_usage.output_tokens) as usize;
812 let prev = self
813 .cumulative_actual_tokens
814 .swap(actual, std::sync::atomic::Ordering::SeqCst);
815 let delta = actual as i64 - prev as i64;
816 let scope = crate::auth::TenantScope::new(tid.clone());
817 tracker.adjust(&scope, delta);
818 }
819
820 let turn_model = response
822 .model
823 .as_deref()
824 .or_else(|| self.provider.model_name());
825 if let Some(model) = turn_model {
826 last_model_name = Some(model.to_string());
827 if let Some(cost) =
828 crate::llm::pricing::estimate_cost(model, &response.usage)
829 {
830 total_cost += cost;
831 }
832 }
833 *usage_acc.lock().expect("usage lock poisoned") = total_usage;
835
836 if let Some(max) = self.max_total_tokens {
838 let used = total_usage.total();
839 if used > max {
840 self.emit(AgentEvent::BudgetExceeded {
841 agent: self.name.clone(),
842 used,
843 limit: max,
844 partial_usage: total_usage,
845 });
846 return Err((
847 Error::BudgetExceeded { used, limit: max },
848 total_usage,
849 ));
850 }
851 }
852
853 let mut tool_calls = response.tool_calls();
854
855 for call in tool_calls.iter_mut() {
863 if !self.tools.contains_key(&call.name)
864 && let Some(repaired) = self.find_closest_tool(&call.name, 2)
865 {
866 let repaired = repaired.to_string();
867 tracing::warn!(
868 agent = %self.name,
869 original = %call.name,
870 repaired = %repaired,
871 "tool name repaired via Levenshtein match (pre-policy)"
872 );
873 self.emit(AgentEvent::ToolNameRepaired {
874 agent: self.name.clone(),
875 original: call.name.clone(),
876 repaired: repaired.clone(),
877 });
878 call.name = repaired;
879 }
880 }
881
882 if let Some(cap) = self.max_tool_calls_per_turn
885 && tool_calls.len() as u32 > cap
886 {
887 let err = Error::Agent(format!(
888 "tool-call cap exceeded: turn produced {} calls, max is {cap}",
889 tool_calls.len()
890 ));
891 self.emit(AgentEvent::RunFailed {
892 agent: self.name.clone(),
893 error: err.to_string(),
894 partial_usage: total_usage,
895 });
896 return Err((err, total_usage));
897 }
898
899 self.emit(AgentEvent::LlmResponse {
900 agent: self.name.clone(),
901 turn: ctx.current_turn(),
902 usage: response.usage,
903 stop_reason: response.stop_reason,
904 tool_call_count: tool_calls.len(),
905 text: truncate_for_event(&response.text(), EVENT_MAX_PAYLOAD_BYTES),
906 latency_ms: llm_latency_ms,
907 model: response
908 .model
909 .clone()
910 .or_else(|| self.provider.model_name().map(|s| s.to_string())),
911 time_to_first_token_ms: 0,
912 });
913
914 self.audit(AuditRecord {
916 agent: self.name.clone(),
917 turn: ctx.current_turn(),
918 event_type: "llm_response".into(),
919 payload: serde_json::json!({
920 "text": response.text(),
921 "stop_reason": format!("{:?}", response.stop_reason),
922 "tool_call_count": tool_calls.len(),
923 "latency_ms": llm_latency_ms,
924 "model": response.model.as_deref()
925 .or_else(|| self.provider.model_name()),
926 }),
927 usage: response.usage,
928 timestamp: chrono::Utc::now(),
929 user_id: self.audit_user_id.clone(),
930 tenant_id: self.audit_tenant_id.clone(),
931 delegation_chain: self.audit_delegation_chain.clone(),
932 })
933 .await;
934
935 let mut post_llm_denied = false;
940 for g in &self.guardrails {
941 match g
942 .post_llm(&mut response)
943 .await
944 .map_err(|e| (e, total_usage))?
945 {
946 GuardAction::Allow => {}
947 GuardAction::Warn { reason } => {
948 self.emit(AgentEvent::GuardrailWarned {
949 agent: self.name.clone(),
950 hook: "post_llm".into(),
951 reason: reason.clone(),
952 tool_name: None,
953 });
954 self.audit(AuditRecord {
955 agent: self.name.clone(),
956 turn: ctx.current_turn(),
957 event_type: "guardrail_warned".into(),
958 payload: serde_json::json!({
959 "hook": "post_llm",
960 "reason": reason,
961 }),
962 usage: TokenUsage::default(),
963 timestamp: chrono::Utc::now(),
964 user_id: self.audit_user_id.clone(),
965 tenant_id: self.audit_tenant_id.clone(),
966 delegation_chain: self.audit_delegation_chain.clone(),
967 })
968 .await;
969 }
971 GuardAction::Deny { reason } => {
972 self.emit(AgentEvent::GuardrailDenied {
973 agent: self.name.clone(),
974 hook: "post_llm".into(),
975 reason: reason.clone(),
976 tool_name: None,
977 });
978 self.audit(AuditRecord {
980 agent: self.name.clone(),
981 turn: ctx.current_turn(),
982 event_type: "guardrail_denied".into(),
983 payload: serde_json::json!({
984 "hook": "post_llm",
985 "reason": reason,
986 }),
987 usage: TokenUsage::default(),
988 timestamp: chrono::Utc::now(),
989 user_id: self.audit_user_id.clone(),
990 tenant_id: self.audit_tenant_id.clone(),
991 delegation_chain: self.audit_delegation_chain.clone(),
992 })
993 .await;
994 ctx.add_assistant_message(Message {
996 role: crate::llm::types::Role::Assistant,
997 content: vec![ContentBlock::Text {
998 text: "[Response denied by guardrail]".into(),
999 }],
1000 });
1001 ctx.add_user_message(format!(
1002 "[Guardrail denied your previous response: {reason}. Please try again.]"
1003 ));
1004 post_llm_denied = true;
1005 break;
1006 }
1007 GuardAction::Kill { reason } => {
1008 self.emit(AgentEvent::KillSwitchActivated {
1009 agent: self.name.clone(),
1010 reason: reason.clone(),
1011 guardrail_name: g.name().to_string(),
1012 });
1013 self.audit(AuditRecord {
1014 agent: self.name.clone(),
1015 turn: ctx.current_turn(),
1016 event_type: "guardrail_killed".into(),
1017 payload: serde_json::json!({
1018 "hook": "post_llm",
1019 "reason": reason,
1020 }),
1021 usage: TokenUsage::default(),
1022 timestamp: chrono::Utc::now(),
1023 user_id: self.audit_user_id.clone(),
1024 tenant_id: self.audit_tenant_id.clone(),
1025 delegation_chain: self.audit_delegation_chain.clone(),
1026 })
1027 .await;
1028 return Err((
1029 Error::KillSwitch(reason),
1030 total_usage,
1031 ));
1032 }
1033 }
1034 }
1035 if post_llm_denied {
1036 continue;
1037 }
1038
1039 ctx.add_assistant_message(Message {
1041 role: crate::llm::types::Role::Assistant,
1042 content: response.content,
1043 });
1044
1045 ctx.evict_media();
1047
1048 if let Some(ref schema) = self.structured_schema
1053 && let Some(respond_call) = tool_calls
1054 .iter()
1055 .find(|tc| tc.name == crate::llm::types::RESPOND_TOOL_NAME)
1056 {
1057 let structured = respond_call.input.clone();
1058
1059 if let Err(validation_error) =
1061 crate::tool::validate_tool_input(schema, &structured)
1062 {
1063 total_tool_calls += tool_calls.len();
1066 tracing::warn!(
1067 agent = %self.name,
1068 error = %validation_error,
1069 "structured output failed schema validation, retrying"
1070 );
1071 ctx.add_tool_results(vec![ToolResult {
1072 tool_use_id: respond_call.id.clone(),
1073 content: format!(
1074 "Structured output validation failed: {validation_error}. \
1075 Please fix the output to match the schema and call __respond__ again."
1076 ),
1077 is_error: true,
1078 }]);
1079 continue;
1080 }
1081
1082 total_tool_calls += tool_calls.len();
1083 let text = serde_json::to_string_pretty(&structured)
1084 .unwrap_or_else(|_| structured.to_string());
1085 self.emit(AgentEvent::RunCompleted {
1086 agent: self.name.clone(),
1087 total_usage,
1088 tool_calls_made: total_tool_calls,
1089 });
1090 let preview_end =
1092 crate::tool::builtins::floor_char_boundary(&text, 1000);
1093 self.audit(AuditRecord {
1094 agent: self.name.clone(),
1095 turn: ctx.current_turn(),
1096 event_type: "run_completed".into(),
1097 payload: serde_json::json!({
1098 "total_tool_calls": total_tool_calls,
1099 "result_preview": &text[..preview_end],
1100 }),
1101 usage: total_usage,
1102 timestamp: chrono::Utc::now(),
1103 user_id: self.audit_user_id.clone(),
1104 tenant_id: self.audit_tenant_id.clone(),
1105 delegation_chain: self.audit_delegation_chain.clone(),
1106 })
1107 .await;
1108 return Ok(AgentOutput {
1109 result: text,
1110 tool_calls_made: total_tool_calls,
1111 tokens_used: total_usage,
1112 structured: Some(structured),
1113 estimated_cost_usd: if total_cost > 0.0 {
1114 Some(total_cost)
1115 } else {
1116 self.estimate_cost(&total_usage)
1117 },
1118 model_name: last_model_name.clone(),
1119 });
1120 }
1121
1122 if tool_calls.is_empty() {
1123 if response.stop_reason == StopReason::MaxTokens {
1125 self.emit(AgentEvent::RunFailed {
1126 agent: self.name.clone(),
1127 error: "Response truncated (max_tokens reached)".into(),
1128 partial_usage: total_usage,
1129 });
1130 return Err((Error::Truncated, total_usage));
1131 }
1132
1133 if self.structured_schema.is_some() {
1137 self.emit(AgentEvent::RunFailed {
1138 agent: self.name.clone(),
1139 error: "LLM returned text without calling __respond__".into(),
1140 partial_usage: total_usage,
1141 });
1142 return Err((
1143 Error::Agent(
1144 "LLM returned text without calling __respond__; \
1145 structured output was not produced"
1146 .into(),
1147 ),
1148 total_usage,
1149 ));
1150 }
1151
1152 if let Some(ref on_input) = self.on_input
1155 && let Some(next_message) = on_input().await
1156 && !next_message.trim().is_empty()
1157 {
1158 ctx.add_user_message(next_message);
1159 continue;
1160 }
1161
1162 self.emit(AgentEvent::RunCompleted {
1163 agent: self.name.clone(),
1164 total_usage,
1165 tool_calls_made: total_tool_calls,
1166 });
1167 let result_text =
1168 ctx.last_assistant_text().unwrap_or_default().to_string();
1169 let preview_end =
1171 crate::tool::builtins::floor_char_boundary(&result_text, 1000);
1172 self.audit(AuditRecord {
1173 agent: self.name.clone(),
1174 turn: ctx.current_turn(),
1175 event_type: "run_completed".into(),
1176 payload: serde_json::json!({
1177 "total_tool_calls": total_tool_calls,
1178 "result_preview": &result_text[..preview_end],
1179 }),
1180 usage: total_usage,
1181 timestamp: chrono::Utc::now(),
1182 user_id: self.audit_user_id.clone(),
1183 tenant_id: self.audit_tenant_id.clone(),
1184 delegation_chain: self.audit_delegation_chain.clone(),
1185 })
1186 .await;
1187 return Ok(AgentOutput {
1188 result: result_text,
1189 tool_calls_made: total_tool_calls,
1190 tokens_used: total_usage,
1191 structured: None,
1192 estimated_cost_usd: if total_cost > 0.0 {
1193 Some(total_cost)
1194 } else {
1195 self.estimate_cost(&total_usage)
1196 },
1197 model_name: last_model_name.clone(),
1198 });
1199 }
1200
1201 let (tool_calls, permission_denied_results) = if self.has_permission_rules() {
1212 let mut allowed = Vec::new();
1213 let mut denied = Vec::new();
1214 let mut needs_approval = Vec::new();
1215
1216 for call in tool_calls {
1217 match self.eval_permission(&call.name, &call.input) {
1218 Some(permission::PermissionAction::Allow) => {
1219 allowed.push(call);
1220 }
1221 Some(permission::PermissionAction::Deny) => {
1222 debug!(
1223 agent = %self.name,
1224 tool = %call.name,
1225 "tool call denied by permission rule"
1226 );
1227 denied.push(ToolResult::error(
1228 call.id.clone(),
1229 format!("Permission denied for tool '{}'", call.name),
1230 ));
1231 }
1232 Some(permission::PermissionAction::Ask) | None => {
1233 needs_approval.push(call);
1234 }
1235 }
1236 }
1237
1238 if !needs_approval.is_empty() {
1240 if let Some(ref cb) = self.on_approval {
1241 self.emit(AgentEvent::ApprovalRequested {
1242 agent: self.name.clone(),
1243 turn: ctx.current_turn(),
1244 tool_names: needs_approval
1245 .iter()
1246 .map(|tc| tc.name.clone())
1247 .collect(),
1248 });
1249 let decision = cb(&needs_approval);
1250 self.emit(AgentEvent::ApprovalDecision {
1251 agent: self.name.clone(),
1252 turn: ctx.current_turn(),
1253 approved: decision.is_allowed(),
1254 });
1255 if decision.is_persistent() {
1257 self.persist_approval_decision(&needs_approval, decision);
1258 }
1259 if decision.is_allowed() {
1260 allowed.extend(needs_approval);
1261 } else {
1262 for call in &needs_approval {
1263 denied.push(ToolResult::error(
1264 call.id.clone(),
1265 "Tool execution denied by human reviewer".to_string(),
1266 ));
1267 }
1268 }
1269 } else {
1270 allowed.extend(needs_approval);
1272 }
1273 }
1274
1275 if allowed.is_empty() && !denied.is_empty() {
1277 total_tool_calls += denied.len();
1278 ctx.add_tool_results(denied);
1279 continue;
1280 }
1281
1282 (allowed, denied)
1283 } else if let Some(ref cb) = self.on_approval {
1284 self.emit(AgentEvent::ApprovalRequested {
1286 agent: self.name.clone(),
1287 turn: ctx.current_turn(),
1288 tool_names: tool_calls.iter().map(|tc| tc.name.clone()).collect(),
1289 });
1290 let decision = cb(&tool_calls);
1291 self.emit(AgentEvent::ApprovalDecision {
1292 agent: self.name.clone(),
1293 turn: ctx.current_turn(),
1294 approved: decision.is_allowed(),
1295 });
1296 if decision.is_persistent() {
1298 self.persist_approval_decision(&tool_calls, decision);
1299 }
1300 if !decision.is_allowed() {
1301 debug!(
1302 agent = %self.name,
1303 "tool execution denied by approval callback"
1304 );
1305 let results: Vec<ToolResult> = tool_calls
1306 .iter()
1307 .map(|tc| {
1308 ToolResult::error(
1309 tc.id.clone(),
1310 "Tool execution denied by human reviewer".to_string(),
1311 )
1312 })
1313 .collect();
1314 total_tool_calls += tool_calls.len();
1315 ctx.add_tool_results(results);
1316 continue;
1317 }
1318 (tool_calls, Vec::new())
1319 } else {
1320 (tool_calls, Vec::new())
1321 };
1322
1323 if let Some(threshold) = self.max_identical_tool_calls {
1326 let (exact, fuzzy) = doom_tracker.record(
1327 &tool_calls,
1328 threshold,
1329 self.max_fuzzy_identical_tool_calls,
1330 );
1331 if exact {
1332 debug!(
1333 agent = %self.name,
1334 count = doom_tracker.count(),
1335 "doom loop detected, returning error results"
1336 );
1337 self.emit(AgentEvent::DoomLoopDetected {
1338 agent: self.name.clone(),
1339 turn: ctx.current_turn(),
1340 consecutive_count: doom_tracker.count(),
1341 tool_names: tool_calls
1342 .iter()
1343 .map(|tc| tc.name.clone())
1344 .collect(),
1345 });
1346 let results: Vec<ToolResult> = tool_calls
1347 .iter()
1348 .map(|tc| {
1349 ToolResult::error(
1350 tc.id.clone(),
1351 format!(
1352 "Doom loop detected: identical tool calls repeated {} \
1353 times consecutively. Try a different approach.",
1354 doom_tracker.count()
1355 ),
1356 )
1357 })
1358 .collect();
1359 total_tool_calls += tool_calls.len();
1360 ctx.add_tool_results(results);
1361 continue;
1362 } else if fuzzy {
1363 debug!(
1364 agent = %self.name,
1365 count = doom_tracker.fuzzy_count(),
1366 "fuzzy doom loop detected, returning error results"
1367 );
1368 self.emit(AgentEvent::FuzzyDoomLoopDetected {
1369 agent: self.name.clone(),
1370 turn: ctx.current_turn(),
1371 consecutive_count: doom_tracker.fuzzy_count(),
1372 tool_names: tool_calls
1373 .iter()
1374 .map(|tc| tc.name.clone())
1375 .collect(),
1376 });
1377 let results: Vec<ToolResult> = tool_calls
1378 .iter()
1379 .map(|tc| {
1380 ToolResult::error(
1381 tc.id.clone(),
1382 format!(
1383 "Fuzzy doom loop detected: same tools with different \
1384 inputs repeated {} times consecutively. Try a \
1385 completely different approach.",
1386 doom_tracker.fuzzy_count()
1387 ),
1388 )
1389 })
1390 .collect();
1391 total_tool_calls += tool_calls.len();
1392 ctx.add_tool_results(results);
1393 continue;
1394 }
1395 }
1396
1397 let (allowed_calls, denied_results) = if self.guardrails.is_empty() {
1399 (tool_calls, Vec::new())
1400 } else {
1401 let mut allowed = Vec::new();
1402 let mut denied = Vec::new();
1403 for call in tool_calls {
1404 let mut call_denied = false;
1405 for g in &self.guardrails {
1406 match g.pre_tool(&call).await.map_err(|e| (e, total_usage))? {
1407 GuardAction::Allow => {}
1408 GuardAction::Warn { reason } => {
1409 self.emit(AgentEvent::GuardrailWarned {
1410 agent: self.name.clone(),
1411 hook: "pre_tool".into(),
1412 reason: reason.clone(),
1413 tool_name: Some(call.name.clone()),
1414 });
1415 self.audit(AuditRecord {
1416 agent: self.name.clone(),
1417 turn: ctx.current_turn(),
1418 event_type: "guardrail_warned".into(),
1419 payload: serde_json::json!({
1420 "hook": "pre_tool",
1421 "reason": reason,
1422 "tool_name": call.name,
1423 }),
1424 usage: TokenUsage::default(),
1425 timestamp: chrono::Utc::now(),
1426 user_id: self.audit_user_id.clone(),
1427 tenant_id: self.audit_tenant_id.clone(),
1428 delegation_chain: self.audit_delegation_chain.clone(),
1429 })
1430 .await;
1431 }
1433 GuardAction::Deny { reason } => {
1434 self.emit(AgentEvent::GuardrailDenied {
1435 agent: self.name.clone(),
1436 hook: "pre_tool".into(),
1437 reason: reason.clone(),
1438 tool_name: Some(call.name.clone()),
1439 });
1440 self.audit(AuditRecord {
1442 agent: self.name.clone(),
1443 turn: ctx.current_turn(),
1444 event_type: "guardrail_denied".into(),
1445 payload: serde_json::json!({
1446 "hook": "pre_tool",
1447 "reason": reason,
1448 "tool_name": call.name,
1449 }),
1450 usage: TokenUsage::default(),
1451 timestamp: chrono::Utc::now(),
1452 user_id: self.audit_user_id.clone(),
1453 tenant_id: self.audit_tenant_id.clone(),
1454 delegation_chain: self.audit_delegation_chain.clone(),
1455 })
1456 .await;
1457 denied.push(ToolResult::error(
1458 call.id.clone(),
1459 format!("Guardrail denied: {reason}"),
1460 ));
1461 call_denied = true;
1462 break;
1463 }
1464 GuardAction::Kill { reason } => {
1465 self.emit(AgentEvent::KillSwitchActivated {
1466 agent: self.name.clone(),
1467 reason: reason.clone(),
1468 guardrail_name: g.name().to_string(),
1469 });
1470 self.audit(AuditRecord {
1471 agent: self.name.clone(),
1472 turn: ctx.current_turn(),
1473 event_type: "guardrail_killed".into(),
1474 payload: serde_json::json!({
1475 "hook": "pre_tool",
1476 "reason": reason,
1477 "tool_name": call.name,
1478 }),
1479 usage: TokenUsage::default(),
1480 timestamp: chrono::Utc::now(),
1481 user_id: self.audit_user_id.clone(),
1482 tenant_id: self.audit_tenant_id.clone(),
1483 delegation_chain: self.audit_delegation_chain.clone(),
1484 })
1485 .await;
1486 return Err((
1487 Error::KillSwitch(reason),
1488 total_usage,
1489 ));
1490 }
1491 }
1492 }
1493 if !call_denied {
1494 allowed.push(call);
1495 }
1496 }
1497 (allowed, denied)
1498 };
1499
1500 total_tool_calls +=
1501 allowed_calls.len() + denied_results.len() + permission_denied_results.len();
1502 recently_used_tools = allowed_calls.iter().map(|c| c.name.clone()).collect();
1504 let tool_batch_span = info_span!(
1505 "heartbit.agent.tool_batch",
1506 agent = %self.name,
1507 turn = ctx.current_turn(),
1508 tool_count = allowed_calls.len(),
1509 );
1510 let mut results = self
1511 .execute_tools_parallel(&allowed_calls, ctx.current_turn())
1512 .instrument(tool_batch_span)
1513 .await;
1514 results.extend(denied_results);
1515 results.extend(permission_denied_results);
1516
1517 if let Some(ref lsp) = self.lsp_manager {
1520 self.append_lsp_diagnostics(lsp, &allowed_calls, &mut results)
1521 .await;
1522 }
1523
1524 if let Some(threshold) = self.tool_output_compression_threshold {
1526 for result in &mut results {
1527 if !result.is_error && result.content.len() > threshold {
1528 let compressed = self
1529 .compress_tool_output(&result.content, threshold, &mut total_usage)
1530 .await;
1531 result.content = compressed;
1532 }
1533 }
1534 *usage_acc.lock().expect("usage lock poisoned") = total_usage;
1535 }
1536
1537 ctx.add_tool_results(results);
1538
1539 if self.enable_reflection {
1542 ctx.add_user_message(
1543 "Before proceeding, briefly reflect on the tool results above:\n\
1544 1. Did you get the information you needed?\n\
1545 2. Are there any errors or unexpected results?\n\
1546 3. What is the best next step?"
1547 .to_string(),
1548 );
1549 }
1550
1551 if let Some(threshold) = self.summarize_threshold
1555 && ctx.message_count() > 5
1556 && ctx.needs_compaction(threshold)
1557 {
1558 debug!(agent = %self.name, "context exceeds threshold, summarizing");
1559 let summarize_span = info_span!(
1560 "heartbit.agent.summarize",
1561 agent = %self.name,
1562 turn = ctx.current_turn(),
1563 );
1564 let (summary, summary_usage) =
1565 match self.generate_summary(&ctx).instrument(summarize_span).await {
1566 Ok(r) => r,
1567 Err(e) => {
1568 self.emit(AgentEvent::RunFailed {
1569 agent: self.name.clone(),
1570 error: e.to_string(),
1571 partial_usage: total_usage,
1572 });
1573 return Err((e, total_usage));
1574 }
1575 };
1576 total_usage += summary_usage;
1577 *usage_acc.lock().expect("usage lock poisoned") = total_usage;
1578 if let Some(summary) = summary {
1579 self.flush_to_memory_before_compaction(&ctx, 4).await;
1580 ctx.inject_summary(summary, 4);
1581 self.emit(AgentEvent::ContextSummarized {
1582 agent: self.name.clone(),
1583 turn: ctx.current_turn(),
1584 usage: summary_usage,
1585 });
1586 }
1587 }
1588 }
1589 }
1590 .instrument(run_span.clone())
1591 .await;
1592
1593 if mode.includes_metrics() {
1595 let usage = match &result {
1596 Ok(output) => &output.tokens_used,
1597 Err((_, usage)) => usage,
1598 };
1599 run_span.record("total_input_tokens", usage.input_tokens);
1600 run_span.record("total_output_tokens", usage.output_tokens);
1601 if let Ok(ref output) = result
1602 && let Some(cost) = output.estimated_cost_usd
1603 {
1604 run_span.record("estimated_cost_usd", cost);
1605 }
1606 }
1607
1608 result
1609 }
1610
1611 async fn generate_summary(
1617 &self,
1618 ctx: &AgentContext,
1619 ) -> Result<(Option<String>, TokenUsage), Error> {
1620 let text = ctx.conversation_text();
1621 let lines: Vec<&str> = text.lines().collect();
1622
1623 const CLUSTER_SIZE: usize = 10;
1625 if self.enable_recursive_summarization && lines.len() > CLUSTER_SIZE * 2 {
1626 return self.generate_recursive_summary(&lines, CLUSTER_SIZE).await;
1627 }
1628
1629 self.summarize_text(&text).await
1630 }
1631
1632 async fn summarize_text(&self, text: &str) -> Result<(Option<String>, TokenUsage), Error> {
1634 let summary_request = CompletionRequest {
1635 system: "You are a summarization assistant. Summarize the following conversation \
1636 concisely, preserving key facts, decisions, and tool results. \
1637 Focus on information that would be needed to continue the conversation."
1638 .into(),
1639 messages: vec![Message::user(text.to_string())],
1640 tools: vec![],
1641 max_tokens: 1024,
1642 tool_choice: None,
1643 reasoning_effort: None,
1644 };
1645
1646 let response = self.provider.complete(summary_request).await?;
1647 let usage = response.usage;
1648 if response.stop_reason == StopReason::MaxTokens {
1649 tracing::warn!(
1650 agent = %self.name,
1651 "summarization truncated (max_tokens reached), skipping compaction"
1652 );
1653 return Ok((None, usage));
1654 }
1655 Ok((Some(response.text()), usage))
1656 }
1657
1658 async fn generate_recursive_summary(
1663 &self,
1664 lines: &[&str],
1665 cluster_size: usize,
1666 ) -> Result<(Option<String>, TokenUsage), Error> {
1667 let mut total_usage = TokenUsage::default();
1668 let mut cluster_summaries = Vec::new();
1669
1670 for chunk in lines.chunks(cluster_size) {
1672 let cluster_text = chunk.join("\n");
1673 let (summary, usage) = self.summarize_text(&cluster_text).await?;
1674 total_usage += usage;
1675 match summary {
1676 Some(s) => cluster_summaries.push(s),
1677 None => {
1678 let full_text = lines.join("\n");
1680 let (summary, usage) = self.summarize_text(&full_text).await?;
1681 total_usage += usage;
1682 return Ok((summary, total_usage));
1683 }
1684 }
1685 }
1686
1687 let combined = format!(
1689 "Summarize the following section summaries into one cohesive summary:\n\n{}",
1690 cluster_summaries
1691 .iter()
1692 .enumerate()
1693 .map(|(i, s)| format!("Section {}:\n{}", i + 1, s))
1694 .collect::<Vec<_>>()
1695 .join("\n\n")
1696 );
1697 let (final_summary, combine_usage) = self.summarize_text(&combined).await?;
1698 total_usage += combine_usage;
1699 Ok((final_summary, total_usage))
1700 }
1701
1702 fn memory_scope(&self) -> crate::auth::TenantScope {
1706 crate::auth::TenantScope::from_audit_fields(
1707 self.audit_tenant_id.as_deref(),
1708 self.audit_user_id.as_deref(),
1709 )
1710 }
1711
1712 async fn flush_to_memory_before_compaction(&self, ctx: &AgentContext, keep_last_n: usize) {
1717 let Some(ref memory) = self.memory else {
1718 return;
1719 };
1720
1721 let messages = ctx.messages_to_be_compacted(keep_last_n);
1722 let now = chrono::Utc::now();
1723
1724 for msg in messages {
1725 if msg.role != crate::llm::types::Role::User {
1726 continue;
1727 }
1728 for block in &msg.content {
1729 if let ContentBlock::ToolResult {
1730 content, is_error, ..
1731 } = block
1732 {
1733 if *is_error || content.len() < 50 {
1735 continue;
1736 }
1737 let stored_content = if content.len() > 500 {
1739 format!(
1740 "{}...",
1741 &content[..crate::tool::builtins::floor_char_boundary(content, 500)]
1742 )
1743 } else {
1744 content.clone()
1745 };
1746 let id = uuid::Uuid::new_v4().to_string();
1747 let entry = crate::memory::MemoryEntry {
1748 id,
1749 agent: self.name.clone(),
1750 content: stored_content,
1751 category: "fact".into(),
1752 tags: vec!["auto-flush".into()],
1753 created_at: now,
1754 last_accessed: now,
1755 access_count: 0,
1756 importance: 3,
1757 memory_type: crate::memory::MemoryType::Episodic,
1758 keywords: vec![],
1759 summary: None,
1760 strength: 0.8,
1761 related_ids: vec![],
1762 source_ids: vec![],
1763 embedding: None,
1764 confidentiality: crate::memory::Confidentiality::default(),
1765 author_user_id: None,
1766 author_tenant_id: None,
1767 };
1768 let scope = self.memory_scope();
1769 if let Err(e) = memory.store(&scope, entry).await {
1770 tracing::warn!(
1771 agent = %self.name,
1772 error = %e,
1773 "failed to flush tool result to memory before compaction"
1774 );
1775 }
1776 }
1777 }
1778 }
1779 }
1780
1781 async fn prune_memory_on_exit(&self) {
1786 let Some(ref memory) = self.memory else {
1787 return;
1788 };
1789 let scope = self.memory_scope();
1790 match crate::memory::pruning::prune_weak_entries(
1791 memory,
1792 &scope,
1793 crate::memory::pruning::DEFAULT_MIN_STRENGTH,
1794 crate::memory::pruning::default_min_age(),
1795 )
1796 .await
1797 {
1798 Ok(0) => {}
1799 Ok(n) => {
1800 tracing::debug!(agent = %self.name, pruned = n, "pruned weak memory entries at session end");
1801 }
1802 Err(e) => {
1803 tracing::warn!(agent = %self.name, error = %e, "memory pruning failed at session end");
1804 }
1805 }
1806 }
1807
1808 async fn consolidate_memory_on_exit(&self) -> TokenUsage {
1813 if !self.consolidate_on_exit {
1814 return TokenUsage::default();
1815 }
1816 let Some(ref memory) = self.memory else {
1817 return TokenUsage::default();
1818 };
1819 let pipeline = crate::memory::consolidation::ConsolidationPipeline::new(
1820 memory.clone(),
1821 self.provider.clone(),
1822 &self.name,
1823 );
1824 let scope = self.memory_scope();
1825 match pipeline.run(&scope).await {
1826 Ok((0, _, usage)) => usage,
1827 Ok((clusters, entries, usage)) => {
1828 tracing::debug!(
1829 agent = %self.name,
1830 clusters,
1831 entries,
1832 "consolidated memories at session end"
1833 );
1834 usage
1835 }
1836 Err(e) => {
1837 tracing::warn!(
1838 agent = %self.name,
1839 error = %e,
1840 "memory consolidation failed at session end"
1841 );
1842 TokenUsage::default()
1843 }
1844 }
1845 }
1846
1847 pub(super) fn select_tools_for_turn(
1854 &self,
1855 all_tools: &[ToolDefinition],
1856 messages: &[Message],
1857 recently_used: &[String],
1858 max_tools: usize,
1859 ) -> Vec<ToolDefinition> {
1860 if all_tools.len() <= max_tools {
1861 return all_tools.to_vec();
1862 }
1863
1864 let recent_text: String = messages
1866 .iter()
1867 .rev()
1868 .take(4)
1869 .flat_map(|m| m.content.iter())
1870 .filter_map(|block| match block {
1871 ContentBlock::Text { text } => Some(text.as_str()),
1872 _ => None,
1873 })
1874 .collect::<Vec<_>>()
1875 .join(" ")
1876 .to_lowercase();
1877
1878 let keywords: Vec<&str> = recent_text
1879 .split(|c: char| !c.is_alphanumeric() && c != '_')
1880 .filter(|w| w.len() > 2)
1881 .collect();
1882
1883 let mut selected: Vec<ToolDefinition> = Vec::new();
1886 let mut candidates: Vec<(ToolDefinition, usize)> = Vec::new();
1887
1888 for tool in all_tools {
1889 if recently_used.contains(&tool.name)
1890 || tool.name == crate::llm::types::RESPOND_TOOL_NAME
1891 {
1892 selected.push(tool.clone());
1893 } else {
1894 let tool_text = format!("{} {}", tool.name, tool.description).to_lowercase();
1896 let score = keywords
1897 .iter()
1898 .filter(|kw| tool_text.contains(**kw))
1899 .count();
1900 candidates.push((tool.clone(), score));
1901 }
1902 }
1903
1904 candidates.sort_by_key(|c| std::cmp::Reverse(c.1));
1906
1907 let remaining = max_tools.saturating_sub(selected.len());
1909 selected.extend(candidates.into_iter().take(remaining).map(|(t, _)| t));
1910
1911 selected.truncate(max_tools);
1912 selected
1913 }
1914
1915 async fn compress_tool_output(
1920 &self,
1921 content: &str,
1922 threshold: usize,
1923 usage_acc: &mut TokenUsage,
1924 ) -> String {
1925 if content.len() < threshold {
1926 return content.to_string();
1927 }
1928 let original_len = content.len();
1929 let request = CompletionRequest {
1930 system: "Compress the following tool output, preserving all factual content, \
1931 key values, and actionable information. Remove redundancy and formatting \
1932 noise. Return ONLY the compressed content."
1933 .into(),
1934 messages: vec![Message::user(content.to_string())],
1935 tools: vec![],
1936 max_tokens: (self.max_tokens / 3).max(256),
1937 tool_choice: None,
1938 reasoning_effort: None,
1939 };
1940 match self.provider.complete(request).await {
1941 Ok(resp) => {
1942 *usage_acc += resp.usage;
1943 let compressed = resp.text();
1944 if compressed.is_empty() {
1945 content.to_string()
1946 } else {
1947 format!("{compressed}\n[compressed from {original_len} bytes]")
1948 }
1949 }
1950 Err(e) => {
1951 debug!(agent = %self.name, error = %e, "tool output compression failed, using original");
1952 content.to_string()
1953 }
1954 }
1955 }
1956
1957 pub(super) fn find_closest_tool(&self, name: &str, max_distance: usize) -> Option<&str> {
1960 self.tools
1961 .keys()
1962 .map(|k| (k.as_str(), levenshtein(name, k)))
1963 .filter(|(_, d)| *d <= max_distance && *d > 0)
1964 .min_by_key(|(_, d)| *d)
1965 .map(|(name, _)| name)
1966 }
1967
1968 async fn append_lsp_diagnostics(
1971 &self,
1972 lsp: &crate::lsp::LspManager,
1973 calls: &[ToolCall],
1974 results: &mut [ToolResult],
1975 ) {
1976 for (idx, call) in calls.iter().enumerate() {
1977 if !crate::lsp::is_file_modifying_tool(&call.name) {
1978 continue;
1979 }
1980 if idx < results.len() && results[idx].is_error {
1982 continue;
1983 }
1984 let path_str = match call
1986 .input
1987 .get("path")
1988 .or_else(|| call.input.get("file_path"))
1989 {
1990 Some(serde_json::Value::String(s)) => s.clone(),
1991 _ => continue,
1992 };
1993 let path = std::path::Path::new(&path_str);
1994 let diagnostics = lsp.notify_file_changed(path).await;
1995 if diagnostics.is_empty() {
1996 tracing::debug!(
1997 agent = %self.name,
1998 path = %path_str,
1999 "lsp: no diagnostics for file"
2000 );
2001 } else {
2002 let formatted = crate::lsp::format_diagnostics(&path_str, &diagnostics);
2003 tracing::info!(
2004 agent = %self.name,
2005 path = %path_str,
2006 count = diagnostics.len(),
2007 "lsp-diagnostics appended to tool result"
2008 );
2009 if idx < results.len() {
2010 results[idx].content.push('\n');
2011 results[idx].content.push_str(&formatted);
2012 }
2013 }
2014 }
2015 }
2016
2017 async fn execute_tools_parallel(&self, calls: &[ToolCall], turn: usize) -> Vec<ToolResult> {
2022 let call_ids: Vec<String> = calls.iter().map(|c| c.id.clone()).collect();
2023 let call_names: Vec<String> = calls.iter().map(|c| c.name.clone()).collect();
2024 let mut join_set = tokio::task::JoinSet::new();
2025
2026 let exec_ctx = crate::ExecutionContext {
2030 tenant_id: self.audit_tenant_id.clone(),
2031 user_id: self.audit_user_id.clone(),
2032 workspace: None,
2033 credentials: None,
2034 audit_sink: None,
2035 };
2036
2037 for (idx, call) in calls.iter().enumerate() {
2038 let tool = self.tools.get(&call.name).cloned();
2044 let input = call.input.clone();
2045 let call_name = call.name.clone();
2046 let timeout = self.tool_timeout;
2047
2048 self.emit(AgentEvent::ToolCallStarted {
2049 agent: self.name.clone(),
2050 tool_name: call.name.clone(),
2051 tool_call_id: call.id.clone(),
2052 input: truncate_for_event(
2053 &serde_json::to_string(&call.input).unwrap_or_default(),
2054 EVENT_MAX_PAYLOAD_BYTES,
2055 ),
2056 });
2057
2058 self.audit(AuditRecord {
2060 agent: self.name.clone(),
2061 turn,
2062 event_type: "tool_call".into(),
2063 payload: serde_json::json!({
2064 "tool_name": call.name,
2065 "tool_call_id": call.id,
2066 "input": call.input,
2067 }),
2068 usage: TokenUsage::default(),
2069 timestamp: chrono::Utc::now(),
2070 user_id: self.audit_user_id.clone(),
2071 tenant_id: self.audit_tenant_id.clone(),
2072 delegation_chain: self.audit_delegation_chain.clone(),
2073 })
2074 .await;
2075
2076 if let Some(ref t) = tool {
2079 let schema = &t.definition().input_schema;
2080 if let Err(msg) = validate_tool_input(schema, &input) {
2081 join_set.spawn(async move { (idx, Ok(ToolOutput::error(msg)), 0u64) });
2082 continue;
2083 }
2084 }
2085
2086 let tool_span = info_span!(
2087 "heartbit.agent.tool_call",
2088 agent = %self.name,
2089 tool_name = %call.name,
2090 );
2091 let task_ctx = exec_ctx.clone();
2092 join_set.spawn(
2093 async move {
2094 let start = std::time::Instant::now();
2095 let output = match tool {
2096 Some(t) => match timeout {
2097 Some(dur) => {
2098 match tokio::time::timeout(dur, t.execute(&task_ctx, input)).await {
2099 Ok(result) => result,
2100 Err(_) => Ok(ToolOutput::error(format!(
2101 "Tool execution timed out after {}s",
2102 dur.as_secs_f64()
2103 ))),
2104 }
2105 }
2106 None => t.execute(&task_ctx, input).await,
2107 },
2108 None => Ok(ToolOutput::error(format!("Tool not found: {call_name}"))),
2109 };
2110 let duration_ms = start.elapsed().as_millis() as u64;
2111 (idx, output, duration_ms)
2112 }
2113 .instrument(tool_span),
2114 );
2115 }
2116
2117 let mut outputs: Vec<Option<(ToolOutput, u64)>> = vec![None; calls.len()];
2119 while let Some(result) = join_set.join_next().await {
2120 match result {
2121 Ok((idx, Ok(output), duration_ms)) => {
2122 let output = match self.max_tool_output_bytes {
2123 Some(max) => output.truncated(max),
2124 None => output,
2125 };
2126 outputs[idx] = Some((output, duration_ms));
2127 }
2128 Ok((idx, Err(e), duration_ms)) => {
2129 outputs[idx] = Some((ToolOutput::error(e.to_string()), duration_ms));
2130 }
2131 Err(join_err) => {
2132 tracing::error!(error = %join_err, "tool task panicked");
2133 }
2134 }
2135 }
2136
2137 let mut results_vec = Vec::with_capacity(calls.len());
2139 for (idx, slot) in outputs.into_iter().enumerate() {
2140 let (mut output, duration_ms) = slot
2141 .unwrap_or_else(|| (ToolOutput::error("Tool execution panicked".to_string()), 0));
2142
2143 for g in &self.guardrails {
2145 if let Err(e) = g.post_tool(&calls[idx], &mut output).await {
2146 self.emit(AgentEvent::GuardrailDenied {
2147 agent: self.name.clone(),
2148 hook: "post_tool".into(),
2149 reason: e.to_string(),
2150 tool_name: Some(call_names[idx].clone()),
2151 });
2152 self.audit(AuditRecord {
2154 agent: self.name.clone(),
2155 turn,
2156 event_type: "guardrail_denied".into(),
2157 payload: serde_json::json!({
2158 "hook": "post_tool",
2159 "reason": e.to_string(),
2160 "tool_name": call_names[idx],
2161 }),
2162 usage: TokenUsage::default(),
2163 timestamp: chrono::Utc::now(),
2164 user_id: self.audit_user_id.clone(),
2170 tenant_id: self.audit_tenant_id.clone(),
2171 delegation_chain: self.audit_delegation_chain.clone(),
2172 })
2173 .await;
2174 output = ToolOutput::error(format!("Guardrail error: {e}"));
2177 break;
2178 }
2179 }
2180
2181 let is_error = output.is_error;
2182 self.emit(AgentEvent::ToolCallCompleted {
2183 agent: self.name.clone(),
2184 tool_name: call_names[idx].clone(),
2185 tool_call_id: call_ids[idx].clone(),
2186 is_error,
2187 duration_ms,
2188 output: truncate_for_event(&output.content, EVENT_MAX_PAYLOAD_BYTES),
2189 });
2190 self.audit(AuditRecord {
2192 agent: self.name.clone(),
2193 turn,
2194 event_type: "tool_result".into(),
2195 payload: serde_json::json!({
2196 "tool_name": call_names[idx],
2197 "tool_call_id": call_ids[idx],
2198 "output": output.content,
2199 "is_error": is_error,
2200 "duration_ms": duration_ms,
2201 }),
2202 usage: TokenUsage::default(),
2203 timestamp: chrono::Utc::now(),
2204 user_id: self.audit_user_id.clone(),
2205 tenant_id: self.audit_tenant_id.clone(),
2206 delegation_chain: self.audit_delegation_chain.clone(),
2207 })
2208 .await;
2209 results_vec.push(tool_output_to_result(call_ids[idx].clone(), output));
2210 }
2211
2212 results_vec
2213 }
2214}
2215
2216impl<P: LlmProvider> Drop for AgentRunner<P> {
2217 fn drop(&mut self) {
2218 if let (Some(tracker), Some(tid)) =
2219 (self.tenant_tracker.as_ref(), self.audit_tenant_id.as_ref())
2220 {
2221 let actual = self
2222 .cumulative_actual_tokens
2223 .load(std::sync::atomic::Ordering::SeqCst) as i64;
2224 if actual > 0 {
2225 let scope = crate::auth::TenantScope::new(tid.clone());
2226 tracker.adjust(&scope, -actual);
2227 }
2228 }
2229 }
2230}
2231
2232pub(super) fn tool_output_to_result(tool_use_id: String, output: ToolOutput) -> ToolResult {
2233 if output.is_error {
2234 ToolResult::error(tool_use_id, output.content)
2235 } else {
2236 ToolResult::success(tool_use_id, output.content)
2237 }
2238}
2239
2240#[cfg(test)]
2241mod tests {
2242 use std::pin::Pin;
2243 use std::sync::Arc;
2244
2245 use crate::agent::tenant_tracker::TenantTokenTracker;
2246 use crate::auth::TenantScope;
2247 use crate::error::Error;
2248 use crate::llm::types::{
2249 CompletionResponse, ContentBlock, StopReason, TokenUsage, ToolDefinition,
2250 };
2251 use crate::tool::{Tool, ToolOutput};
2252
2253 use super::super::test_helpers::MockProvider;
2254 use super::AgentRunner;
2255
2256 struct NoopTool;
2258
2259 impl Tool for NoopTool {
2260 fn definition(&self) -> ToolDefinition {
2261 ToolDefinition {
2262 name: "noop".into(),
2263 description: "Does nothing.".into(),
2264 input_schema: serde_json::json!({"type": "object", "properties": {}}),
2265 }
2266 }
2267
2268 fn execute(
2269 &self,
2270 _ctx: &crate::ExecutionContext,
2271 _input: serde_json::Value,
2272 ) -> Pin<Box<dyn std::future::Future<Output = Result<ToolOutput, Error>> + Send + '_>>
2273 {
2274 Box::pin(async { Ok(ToolOutput::success("ok".to_string())) })
2275 }
2276 }
2277
2278 fn tool_use_response(input_tokens: u32, output_tokens: u32) -> CompletionResponse {
2280 CompletionResponse {
2281 content: vec![ContentBlock::ToolUse {
2282 id: "call-1".into(),
2283 name: "noop".into(),
2284 input: serde_json::json!({}),
2285 }],
2286 stop_reason: StopReason::ToolUse,
2287 usage: TokenUsage {
2288 input_tokens,
2289 output_tokens,
2290 ..Default::default()
2291 },
2292 model: None,
2293 }
2294 }
2295
2296 #[tokio::test(flavor = "multi_thread")]
2297 async fn agent_runner_adjusts_tenant_tracker_per_turn() {
2298 let tracker = Arc::new(TenantTokenTracker::new(1_000_000));
2299 let scope = TenantScope::new("acme");
2300 drop(tracker.reserve(&scope, 5000).unwrap());
2303 assert_eq!(tracker.snapshot()[0].1.in_flight, 0);
2304
2305 let provider = Arc::new(MockProvider::new(vec![MockProvider::text_response(
2307 "done", 100, 200,
2308 )]));
2309
2310 let runner = AgentRunner::builder(provider)
2311 .name("test")
2312 .system_prompt("test")
2313 .audit_user_context("test-user", "acme")
2314 .tenant_tracker(tracker.clone())
2315 .max_turns(1)
2316 .build()
2317 .unwrap();
2318 let _output = runner.execute("hello").await.unwrap();
2319
2320 let snap = tracker.snapshot();
2322 assert_eq!(snap[0].1.in_flight, 300);
2323
2324 drop(runner);
2326 let snap = tracker.snapshot();
2327 assert_eq!(snap[0].1.in_flight, 0);
2328 }
2329
2330 #[tokio::test(flavor = "multi_thread")]
2331 async fn agent_runner_adjusts_tracker_cumulatively_across_turns() {
2332 let tracker = Arc::new(TenantTokenTracker::new(1_000_000));
2337 let scope = TenantScope::new("acme");
2338 drop(tracker.reserve(&scope, 5000).unwrap());
2339
2340 let provider = Arc::new(MockProvider::new(vec![
2341 tool_use_response(100, 200), MockProvider::text_response("done", 50, 150), ]));
2344
2345 let runner = AgentRunner::builder(provider)
2346 .name("test")
2347 .system_prompt("test")
2348 .audit_user_context("test-user", "acme")
2349 .tenant_tracker(tracker.clone())
2350 .max_turns(2)
2351 .tool(Arc::new(NoopTool))
2352 .build()
2353 .unwrap();
2354 let _output = runner.execute("hello").await.unwrap();
2355
2356 let snap = tracker.snapshot();
2358 assert_eq!(snap[0].1.in_flight, 500);
2359
2360 drop(runner);
2361 assert_eq!(tracker.snapshot()[0].1.in_flight, 0);
2362 }
2363
2364 #[tokio::test]
2365 async fn execution_context_propagates_to_tool() {
2366 use std::sync::Mutex;
2367
2368 use crate::ExecutionContext;
2369 use crate::llm::types::ToolCall;
2370
2371 struct CtxCapturingTool {
2372 captured_tenant: Arc<Mutex<Option<String>>>,
2373 }
2374
2375 impl Tool for CtxCapturingTool {
2376 fn definition(&self) -> ToolDefinition {
2377 ToolDefinition {
2378 name: "ctx_capture".into(),
2379 description: "Captures the tenant_id from ExecutionContext.".into(),
2380 input_schema: serde_json::json!({"type": "object"}),
2381 }
2382 }
2383
2384 fn execute(
2385 &self,
2386 ctx: &ExecutionContext,
2387 _input: serde_json::Value,
2388 ) -> Pin<Box<dyn std::future::Future<Output = Result<ToolOutput, Error>> + Send + '_>>
2389 {
2390 let captured = self.captured_tenant.clone();
2391 let tenant = ctx.tenant_id.clone();
2392 Box::pin(async move {
2393 *captured.lock().unwrap() = tenant;
2394 Ok(ToolOutput::success("ok"))
2395 })
2396 }
2397 }
2398
2399 let captured = Arc::new(Mutex::new(None));
2400 let tool = Arc::new(CtxCapturingTool {
2401 captured_tenant: captured.clone(),
2402 });
2403
2404 let provider = Arc::new(MockProvider::new(vec![]));
2405 let runner = AgentRunner::builder(provider)
2406 .name("test")
2407 .system_prompt("test")
2408 .max_turns(1)
2409 .tools(vec![tool as Arc<dyn Tool>])
2410 .audit_user_context("test-user", "test-tenant")
2411 .build()
2412 .unwrap();
2413
2414 let calls = vec![ToolCall {
2415 id: "c1".into(),
2416 name: "ctx_capture".into(),
2417 input: serde_json::json!({}),
2418 }];
2419 let _results = runner.execute_tools_parallel(&calls, 0).await;
2420
2421 assert_eq!(
2422 captured.lock().unwrap().as_deref(),
2423 Some("test-tenant"),
2424 "tool did not receive the tenant_id from ExecutionContext"
2425 );
2426 }
2427}