1use adk_core::{
2 AfterAgentCallback, AfterModelCallback, AfterToolCallback, AfterToolCallbackFull, Agent,
3 BeforeAgentCallback, BeforeModelCallback, BeforeModelResult, BeforeToolCallback,
4 CallbackContext, Content, Event, EventActions, FunctionResponseData, GlobalInstructionProvider,
5 InstructionProvider, InvocationContext, Llm, LlmRequest, LlmResponse, MemoryEntry,
6 OnToolErrorCallback, Part, ReadonlyContext, Result, RetryBudget, Tool, ToolCallbackContext,
7 ToolConfirmationDecision, ToolConfirmationPolicy, ToolConfirmationRequest, ToolContext,
8 ToolExecutionStrategy, ToolOutcome, Toolset,
9};
10use async_stream::stream;
11use async_trait::async_trait;
12use std::sync::{Arc, Mutex};
13use tracing::Instrument;
14
15#[cfg(feature = "enhanced-plugins")]
16use adk_plugin::{
17 BeforeModelCallResult, BeforeToolCallResult, EnhancedPlugin, EnhancedPluginManager,
18};
19
20#[cfg(feature = "skills")]
21use crate::skill_shim::load_skill_index;
22use crate::{
23 guardrails::{GuardrailSet, enforce_guardrails},
24 skill_shim::{SelectionPolicy, SkillIndex, select_skill_prompt_block},
25 tool_call_markup::normalize_option_content,
26 workflow::with_user_content_override,
27};
28
29pub const DEFAULT_MAX_ITERATIONS: u32 = 100;
31
32pub const DEFAULT_TOOL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300);
34
35fn trace_json_payload<T: serde::Serialize>(
36 value: &T,
37 record_payloads: bool,
38 max_bytes: usize,
39) -> String {
40 let json = serde_json::to_string(value).unwrap_or_default();
41 if cfg!(feature = "record-payloads") && record_payloads {
42 return json;
43 }
44
45 let max_bytes = max_bytes.max(32);
46 if json.len() <= max_bytes {
47 return json;
48 }
49
50 let mut end = max_bytes;
51 while !json.is_char_boundary(end) {
52 end -= 1;
53 }
54 format!("{}...[truncated {} bytes]", &json[..end], json.len() - end)
55}
56
57pub struct LlmAgent {
65 name: String,
66 description: String,
67 model: Arc<dyn Llm>,
68 instruction: Option<String>,
69 instruction_provider: Option<Arc<InstructionProvider>>,
70 global_instruction: Option<String>,
71 global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
72 skills_index: Option<Arc<SkillIndex>>,
73 skill_policy: SelectionPolicy,
74 max_skill_chars: usize,
75 #[allow(dead_code)] input_schema: Option<serde_json::Value>,
77 output_schema: Option<serde_json::Value>,
78 output_max_retries: usize,
80 disallow_transfer_to_parent: bool,
81 disallow_transfer_to_peers: bool,
82 include_contents: adk_core::IncludeContents,
83 tools: Vec<Arc<dyn Tool>>,
84 toolsets: Vec<Arc<dyn Toolset>>,
85 sub_agents: Vec<Arc<dyn Agent>>,
86 output_key: Option<String>,
87 generate_content_config: Option<adk_core::GenerateContentConfig>,
89 max_iterations: u32,
91 tool_timeout: std::time::Duration,
93 before_callbacks: Arc<Vec<BeforeAgentCallback>>,
94 after_callbacks: Arc<Vec<AfterAgentCallback>>,
95 before_model_callbacks: Arc<Vec<BeforeModelCallback>>,
96 after_model_callbacks: Arc<Vec<AfterModelCallback>>,
97 before_tool_callbacks: Arc<Vec<BeforeToolCallback>>,
98 after_tool_callbacks: Arc<Vec<AfterToolCallback>>,
99 on_tool_error_callbacks: Arc<Vec<OnToolErrorCallback>>,
100 after_tool_callbacks_full: Arc<Vec<AfterToolCallbackFull>>,
102 default_retry_budget: Option<RetryBudget>,
104 tool_retry_budgets: std::collections::HashMap<String, RetryBudget>,
106 circuit_breaker_threshold: Option<u32>,
109 tool_confirmation_policy: ToolConfirmationPolicy,
110 tool_execution_strategy: Option<ToolExecutionStrategy>,
113 input_guardrails: Arc<GuardrailSet>,
114 output_guardrails: Arc<GuardrailSet>,
115 #[cfg(feature = "enhanced-plugins")]
118 enhanced_plugin_manager: Option<Arc<EnhancedPluginManager>>,
119 #[cfg(feature = "sandbox")]
123 sandbox_config: Option<adk_sandbox::workspace::SandboxConfig>,
124}
125
126impl std::fmt::Debug for LlmAgent {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct("LlmAgent")
129 .field("name", &self.name)
130 .field("description", &self.description)
131 .field("model", &self.model.name())
132 .field("instruction", &self.instruction)
133 .field("tools_count", &self.tools.len())
134 .field("sub_agents_count", &self.sub_agents.len())
135 .finish()
136 }
137}
138
139impl LlmAgent {
140 #[cfg(feature = "sandbox")]
147 pub fn sandbox_config(&self) -> Option<&adk_sandbox::workspace::SandboxConfig> {
148 self.sandbox_config.as_ref()
149 }
150
151 async fn apply_input_guardrails(
152 ctx: Arc<dyn InvocationContext>,
153 input_guardrails: Arc<GuardrailSet>,
154 ) -> Result<Arc<dyn InvocationContext>> {
155 let content =
156 enforce_guardrails(input_guardrails.as_ref(), ctx.user_content(), "input").await?;
157 if content.role != ctx.user_content().role || content.parts != ctx.user_content().parts {
158 Ok(with_user_content_override(ctx, content))
159 } else {
160 Ok(ctx)
161 }
162 }
163
164 async fn apply_output_guardrails(
165 output_guardrails: &GuardrailSet,
166 content: Content,
167 ) -> Result<Content> {
168 enforce_guardrails(output_guardrails, &content, "output").await
169 }
170
171 fn history_parts_from_provider_metadata(
172 provider_metadata: Option<&serde_json::Value>,
173 ) -> Vec<Part> {
174 let Some(provider_metadata) = provider_metadata else {
175 return Vec::new();
176 };
177
178 let history_parts = provider_metadata
179 .get("conversation_history_parts")
180 .or_else(|| {
181 provider_metadata
182 .get("openai")
183 .and_then(|openai| openai.get("conversation_history_parts"))
184 })
185 .and_then(serde_json::Value::as_array);
186
187 history_parts
188 .into_iter()
189 .flatten()
190 .filter_map(|value| serde_json::from_value::<Part>(value.clone()).ok())
191 .collect()
192 }
193
194 fn augment_content_for_history(
195 content: &Content,
196 provider_metadata: Option<&serde_json::Value>,
197 ) -> Content {
198 let mut augmented = content.clone();
199 augmented.parts.extend(Self::history_parts_from_provider_metadata(provider_metadata));
200 augmented
201 }
202}
203
204fn validate_output_against_schema(
209 text: &str,
210 schema: &serde_json::Value,
211) -> std::result::Result<serde_json::Value, String> {
212 let parsed: serde_json::Value =
213 serde_json::from_str(text).map_err(|e| format!("Response is not valid JSON: {e}"))?;
214
215 let validator =
216 jsonschema::validator_for(schema).map_err(|e| format!("Invalid schema: {e}"))?;
217
218 let errors: Vec<String> = validator.iter_errors(&parsed).map(|e| e.to_string()).collect();
219
220 if errors.is_empty() { Ok(parsed) } else { Err(errors.join("; ")) }
221}
222
223fn extract_text_from_events(events: &[Event]) -> Option<String> {
228 for event in events.iter().rev() {
229 if let Some(ref content) = event.llm_response.content {
230 let text: String =
231 content
232 .parts
233 .iter()
234 .filter_map(|p| {
235 if let Part::Text { text } = p { Some(text.as_str()) } else { None }
236 })
237 .collect::<Vec<_>>()
238 .join("");
239 if !text.is_empty() {
240 return Some(text);
241 }
242 }
243 }
244 None
245}
246
247pub fn extract_typed<T: serde::de::DeserializeOwned>(events: &[Event]) -> Result<T> {
269 let text = extract_text_from_events(events).ok_or_else(|| {
270 adk_core::AdkError::agent("no text content found in events for typed extraction")
271 })?;
272
273 serde_json::from_str(&text)
274 .map_err(|e| adk_core::AdkError::agent(format!("output deserialization failed: {e}")))
275}
276
277pub struct LlmAgentBuilder {
279 name: String,
280 description: Option<String>,
281 model: Option<Arc<dyn Llm>>,
282 instruction: Option<String>,
283 instruction_provider: Option<Arc<InstructionProvider>>,
284 global_instruction: Option<String>,
285 global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
286 skills_index: Option<Arc<SkillIndex>>,
287 skill_policy: SelectionPolicy,
288 max_skill_chars: usize,
289 input_schema: Option<serde_json::Value>,
290 output_schema: Option<serde_json::Value>,
291 output_max_retries: usize,
292 disallow_transfer_to_parent: bool,
293 disallow_transfer_to_peers: bool,
294 include_contents: adk_core::IncludeContents,
295 tools: Vec<Arc<dyn Tool>>,
296 toolsets: Vec<Arc<dyn Toolset>>,
297 sub_agents: Vec<Arc<dyn Agent>>,
298 output_key: Option<String>,
299 generate_content_config: Option<adk_core::GenerateContentConfig>,
300 max_iterations: u32,
301 tool_timeout: std::time::Duration,
302 before_callbacks: Vec<BeforeAgentCallback>,
303 after_callbacks: Vec<AfterAgentCallback>,
304 before_model_callbacks: Vec<BeforeModelCallback>,
305 after_model_callbacks: Vec<AfterModelCallback>,
306 before_tool_callbacks: Vec<BeforeToolCallback>,
307 after_tool_callbacks: Vec<AfterToolCallback>,
308 on_tool_error_callbacks: Vec<OnToolErrorCallback>,
309 after_tool_callbacks_full: Vec<AfterToolCallbackFull>,
310 default_retry_budget: Option<RetryBudget>,
311 tool_retry_budgets: std::collections::HashMap<String, RetryBudget>,
312 circuit_breaker_threshold: Option<u32>,
313 tool_confirmation_policy: ToolConfirmationPolicy,
314 tool_execution_strategy: Option<ToolExecutionStrategy>,
315 input_guardrails: GuardrailSet,
316 output_guardrails: GuardrailSet,
317 #[cfg(feature = "enhanced-plugins")]
319 enhanced_plugins: Vec<Arc<dyn EnhancedPlugin>>,
320 #[cfg(feature = "sandbox")]
322 sandbox_config: Option<adk_sandbox::workspace::SandboxConfig>,
323}
324
325impl LlmAgentBuilder {
326 pub fn new(name: impl Into<String>) -> Self {
328 Self {
329 name: name.into(),
330 description: None,
331 model: None,
332 instruction: None,
333 instruction_provider: None,
334 global_instruction: None,
335 global_instruction_provider: None,
336 skills_index: None,
337 skill_policy: SelectionPolicy::default(),
338 max_skill_chars: 2000,
339 input_schema: None,
340 output_schema: None,
341 output_max_retries: 3,
342 disallow_transfer_to_parent: false,
343 disallow_transfer_to_peers: false,
344 include_contents: adk_core::IncludeContents::Default,
345 tools: Vec::new(),
346 toolsets: Vec::new(),
347 sub_agents: Vec::new(),
348 output_key: None,
349 generate_content_config: None,
350 max_iterations: DEFAULT_MAX_ITERATIONS,
351 tool_timeout: DEFAULT_TOOL_TIMEOUT,
352 before_callbacks: Vec::new(),
353 after_callbacks: Vec::new(),
354 before_model_callbacks: Vec::new(),
355 after_model_callbacks: Vec::new(),
356 before_tool_callbacks: Vec::new(),
357 after_tool_callbacks: Vec::new(),
358 on_tool_error_callbacks: Vec::new(),
359 after_tool_callbacks_full: Vec::new(),
360 default_retry_budget: None,
361 tool_retry_budgets: std::collections::HashMap::new(),
362 circuit_breaker_threshold: None,
363 tool_confirmation_policy: ToolConfirmationPolicy::Never,
364 tool_execution_strategy: None,
365 input_guardrails: GuardrailSet::new(),
366 output_guardrails: GuardrailSet::new(),
367 #[cfg(feature = "enhanced-plugins")]
368 enhanced_plugins: Vec::new(),
369 #[cfg(feature = "sandbox")]
370 sandbox_config: None,
371 }
372 }
373
374 pub fn description(mut self, desc: impl Into<String>) -> Self {
376 self.description = Some(desc.into());
377 self
378 }
379
380 pub fn model(mut self, model: Arc<dyn Llm>) -> Self {
382 self.model = Some(model);
383 self
384 }
385
386 pub fn instruction(mut self, instruction: impl Into<String>) -> Self {
388 self.instruction = Some(instruction.into());
389 self
390 }
391
392 pub fn instruction_provider(mut self, provider: InstructionProvider) -> Self {
394 self.instruction_provider = Some(Arc::new(provider));
395 self
396 }
397
398 pub fn global_instruction(mut self, instruction: impl Into<String>) -> Self {
400 self.global_instruction = Some(instruction.into());
401 self
402 }
403
404 pub fn global_instruction_provider(mut self, provider: GlobalInstructionProvider) -> Self {
406 self.global_instruction_provider = Some(Arc::new(provider));
407 self
408 }
409
410 #[cfg(feature = "skills")]
412 pub fn with_skills(mut self, index: SkillIndex) -> Self {
413 self.skills_index = Some(Arc::new(index));
414 self
415 }
416
417 #[cfg(feature = "skills")]
419 pub fn with_auto_skills(self) -> Result<Self> {
420 self.with_skills_from_root(".")
421 }
422
423 #[cfg(feature = "skills")]
425 pub fn with_skills_from_root(mut self, root: impl AsRef<std::path::Path>) -> Result<Self> {
426 let index = load_skill_index(root).map_err(|e| adk_core::AdkError::agent(e.to_string()))?;
427 self.skills_index = Some(Arc::new(index));
428 Ok(self)
429 }
430
431 #[cfg(feature = "skills")]
433 pub fn with_skill_policy(mut self, policy: SelectionPolicy) -> Self {
434 self.skill_policy = policy;
435 self
436 }
437
438 #[cfg(feature = "skills")]
440 pub fn with_skill_budget(mut self, max_chars: usize) -> Self {
441 self.max_skill_chars = max_chars;
442 self
443 }
444
445 pub fn input_schema(mut self, schema: serde_json::Value) -> Self {
447 self.input_schema = Some(schema);
448 self
449 }
450
451 pub fn output_schema(mut self, schema: serde_json::Value) -> Self {
453 self.output_schema = Some(schema);
454 self
455 }
456
457 pub fn output_type<T: schemars::JsonSchema>(mut self) -> Self {
480 let schema = schemars::schema_for!(T);
481 self.output_schema =
482 Some(serde_json::to_value(schema).expect("schema serialization cannot fail"));
483 self
484 }
485
486 pub fn output_max_retries(mut self, n: usize) -> Self {
491 self.output_max_retries = n;
492 self
493 }
494
495 pub fn disallow_transfer_to_parent(mut self, disallow: bool) -> Self {
497 self.disallow_transfer_to_parent = disallow;
498 self
499 }
500
501 pub fn disallow_transfer_to_peers(mut self, disallow: bool) -> Self {
503 self.disallow_transfer_to_peers = disallow;
504 self
505 }
506
507 pub fn include_contents(mut self, include: adk_core::IncludeContents) -> Self {
509 self.include_contents = include;
510 self
511 }
512
513 pub fn output_key(mut self, key: impl Into<String>) -> Self {
515 self.output_key = Some(key.into());
516 self
517 }
518
519 pub fn generate_content_config(mut self, config: adk_core::GenerateContentConfig) -> Self {
540 self.generate_content_config = Some(config);
541 self
542 }
543
544 pub fn temperature(mut self, temperature: f32) -> Self {
547 self.generate_content_config
548 .get_or_insert(adk_core::GenerateContentConfig::default())
549 .temperature = Some(temperature);
550 self
551 }
552
553 pub fn top_p(mut self, top_p: f32) -> Self {
555 self.generate_content_config
556 .get_or_insert(adk_core::GenerateContentConfig::default())
557 .top_p = Some(top_p);
558 self
559 }
560
561 pub fn top_k(mut self, top_k: i32) -> Self {
563 self.generate_content_config
564 .get_or_insert(adk_core::GenerateContentConfig::default())
565 .top_k = Some(top_k);
566 self
567 }
568
569 pub fn max_output_tokens(mut self, max_tokens: i32) -> Self {
571 self.generate_content_config
572 .get_or_insert(adk_core::GenerateContentConfig::default())
573 .max_output_tokens = Some(max_tokens);
574 self
575 }
576
577 pub fn max_iterations(mut self, max: u32) -> Self {
580 self.max_iterations = max;
581 self
582 }
583
584 pub fn tool_timeout(mut self, timeout: std::time::Duration) -> Self {
587 self.tool_timeout = timeout;
588 self
589 }
590
591 pub fn tool(mut self, tool: Arc<dyn Tool>) -> Self {
593 self.tools.push(tool);
594 self
595 }
596
597 pub fn toolset(mut self, toolset: Arc<dyn Toolset>) -> Self {
603 self.toolsets.push(toolset);
604 self
605 }
606
607 pub fn sub_agent(mut self, agent: Arc<dyn Agent>) -> Self {
609 self.sub_agents.push(agent);
610 self
611 }
612
613 pub fn before_callback(mut self, callback: BeforeAgentCallback) -> Self {
615 self.before_callbacks.push(callback);
616 self
617 }
618
619 pub fn after_callback(mut self, callback: AfterAgentCallback) -> Self {
621 self.after_callbacks.push(callback);
622 self
623 }
624
625 pub fn before_model_callback(mut self, callback: BeforeModelCallback) -> Self {
627 self.before_model_callbacks.push(callback);
628 self
629 }
630
631 pub fn after_model_callback(mut self, callback: AfterModelCallback) -> Self {
633 self.after_model_callbacks.push(callback);
634 self
635 }
636
637 pub fn before_tool_callback(mut self, callback: BeforeToolCallback) -> Self {
639 self.before_tool_callbacks.push(callback);
640 self
641 }
642
643 pub fn after_tool_callback(mut self, callback: AfterToolCallback) -> Self {
645 self.after_tool_callbacks.push(callback);
646 self
647 }
648
649 pub fn after_tool_callback_full(mut self, callback: AfterToolCallbackFull) -> Self {
664 self.after_tool_callbacks_full.push(callback);
665 self
666 }
667
668 pub fn on_tool_error(mut self, callback: OnToolErrorCallback) -> Self {
676 self.on_tool_error_callbacks.push(callback);
677 self
678 }
679
680 pub fn default_retry_budget(mut self, budget: RetryBudget) -> Self {
687 self.default_retry_budget = Some(budget);
688 self
689 }
690
691 pub fn tool_retry_budget(mut self, tool_name: impl Into<String>, budget: RetryBudget) -> Self {
696 self.tool_retry_budgets.insert(tool_name.into(), budget);
697 self
698 }
699
700 pub fn circuit_breaker_threshold(mut self, threshold: u32) -> Self {
707 self.circuit_breaker_threshold = Some(threshold);
708 self
709 }
710
711 pub fn tool_confirmation_policy(mut self, policy: ToolConfirmationPolicy) -> Self {
713 self.tool_confirmation_policy = policy;
714 self
715 }
716
717 pub fn require_tool_confirmation(mut self, tool_name: impl Into<String>) -> Self {
719 self.tool_confirmation_policy = self.tool_confirmation_policy.with_tool(tool_name);
720 self
721 }
722
723 pub fn require_tool_confirmation_for_all(mut self) -> Self {
725 self.tool_confirmation_policy = ToolConfirmationPolicy::Always;
726 self
727 }
728
729 pub fn tool_execution_strategy(mut self, strategy: ToolExecutionStrategy) -> Self {
735 self.tool_execution_strategy = Some(strategy);
736 self
737 }
738
739 pub fn input_guardrails(mut self, guardrails: GuardrailSet) -> Self {
748 self.input_guardrails = guardrails;
749 self
750 }
751
752 pub fn output_guardrails(mut self, guardrails: GuardrailSet) -> Self {
761 self.output_guardrails = guardrails;
762 self
763 }
764
765 #[cfg(feature = "enhanced-plugins")]
785 pub fn enhanced_plugin(mut self, plugin: Arc<dyn EnhancedPlugin>) -> Self {
786 self.enhanced_plugins.push(plugin);
787 self
788 }
789
790 #[cfg(feature = "enhanced-plugins")]
812 pub fn enhanced_plugins(mut self, plugins: Vec<Arc<dyn EnhancedPlugin>>) -> Self {
813 self.enhanced_plugins.extend(plugins);
814 self
815 }
816
817 #[cfg(feature = "sandbox")]
852 pub fn sandbox_config(mut self, config: adk_sandbox::workspace::SandboxConfig) -> Self {
853 self.sandbox_config = Some(config);
854 self
855 }
856
857 pub fn build(self) -> Result<LlmAgent> {
859 let model = self.model.ok_or_else(|| adk_core::AdkError::agent("Model is required"))?;
860
861 let mut seen_names = std::collections::HashSet::new();
862 for agent in &self.sub_agents {
863 if !seen_names.insert(agent.name()) {
864 return Err(adk_core::AdkError::agent(format!(
865 "Duplicate sub-agent name: {}",
866 agent.name()
867 )));
868 }
869 }
870
871 #[cfg(feature = "sandbox")]
874 if let Some(ref sandbox_cfg) = self.sandbox_config {
875 use adk_sandbox::workspace::Capability;
876 if model.uses_interactions_api()
877 && (sandbox_cfg.capabilities.contains(&Capability::Shell)
878 || sandbox_cfg.capabilities.contains(&Capability::Filesystem))
879 {
880 return Err(adk_core::AdkError::new(
881 adk_core::ErrorComponent::Agent,
882 adk_core::ErrorCategory::InvalidInput,
883 "code.gemini_interactions_conflict",
884 "Cannot combine Gemini Interactions API (server-managed environment) \
885 with client-side sandbox tools (Shell/Filesystem). These provide \
886 competing filesystems and would produce nondeterministic behavior. \
887 Either disable use_interactions_api or remove sandbox capabilities.",
888 ));
889 }
890 }
891
892 #[cfg(feature = "enhanced-plugins")]
894 let enhanced_plugin_manager = if self.enhanced_plugins.is_empty() {
895 None
896 } else {
897 Some(Arc::new(EnhancedPluginManager::new(self.enhanced_plugins)))
898 };
899
900 Ok(LlmAgent {
901 name: self.name,
902 description: self.description.unwrap_or_default(),
903 model,
904 instruction: self.instruction,
905 instruction_provider: self.instruction_provider,
906 global_instruction: self.global_instruction,
907 global_instruction_provider: self.global_instruction_provider,
908 skills_index: self.skills_index,
909 skill_policy: self.skill_policy,
910 max_skill_chars: self.max_skill_chars,
911 input_schema: self.input_schema,
912 output_schema: self.output_schema,
913 output_max_retries: self.output_max_retries,
914 disallow_transfer_to_parent: self.disallow_transfer_to_parent,
915 disallow_transfer_to_peers: self.disallow_transfer_to_peers,
916 include_contents: self.include_contents,
917 tools: self.tools,
918 toolsets: self.toolsets,
919 sub_agents: self.sub_agents,
920 output_key: self.output_key,
921 generate_content_config: self.generate_content_config,
922 max_iterations: self.max_iterations,
923 tool_timeout: self.tool_timeout,
924 before_callbacks: Arc::new(self.before_callbacks),
925 after_callbacks: Arc::new(self.after_callbacks),
926 before_model_callbacks: Arc::new(self.before_model_callbacks),
927 after_model_callbacks: Arc::new(self.after_model_callbacks),
928 before_tool_callbacks: Arc::new(self.before_tool_callbacks),
929 after_tool_callbacks: Arc::new(self.after_tool_callbacks),
930 on_tool_error_callbacks: Arc::new(self.on_tool_error_callbacks),
931 after_tool_callbacks_full: Arc::new(self.after_tool_callbacks_full),
932 default_retry_budget: self.default_retry_budget,
933 tool_retry_budgets: self.tool_retry_budgets,
934 circuit_breaker_threshold: self.circuit_breaker_threshold,
935 tool_confirmation_policy: self.tool_confirmation_policy,
936 tool_execution_strategy: self.tool_execution_strategy,
937 input_guardrails: Arc::new(self.input_guardrails),
938 output_guardrails: Arc::new(self.output_guardrails),
939 #[cfg(feature = "enhanced-plugins")]
940 enhanced_plugin_manager,
941 #[cfg(feature = "sandbox")]
942 sandbox_config: self.sandbox_config,
943 })
944 }
945}
946
947struct AgentToolContext {
950 parent_ctx: Arc<dyn InvocationContext>,
951 function_call_id: String,
952 actions: Mutex<EventActions>,
953}
954
955impl AgentToolContext {
956 fn new(parent_ctx: Arc<dyn InvocationContext>, function_call_id: String) -> Self {
957 Self { parent_ctx, function_call_id, actions: Mutex::new(EventActions::default()) }
958 }
959
960 fn actions_guard(&self) -> std::sync::MutexGuard<'_, EventActions> {
961 self.actions.lock().unwrap_or_else(|e| e.into_inner())
962 }
963}
964
965#[async_trait]
966impl ReadonlyContext for AgentToolContext {
967 fn invocation_id(&self) -> &str {
968 self.parent_ctx.invocation_id()
969 }
970
971 fn agent_name(&self) -> &str {
972 self.parent_ctx.agent_name()
973 }
974
975 fn user_id(&self) -> &str {
976 self.parent_ctx.user_id()
978 }
979
980 fn app_name(&self) -> &str {
981 self.parent_ctx.app_name()
983 }
984
985 fn session_id(&self) -> &str {
986 self.parent_ctx.session_id()
988 }
989
990 fn branch(&self) -> &str {
991 self.parent_ctx.branch()
992 }
993
994 fn user_content(&self) -> &Content {
995 self.parent_ctx.user_content()
996 }
997}
998
999#[async_trait]
1000impl CallbackContext for AgentToolContext {
1001 fn artifacts(&self) -> Option<Arc<dyn adk_core::Artifacts>> {
1002 self.parent_ctx.artifacts()
1004 }
1005
1006 fn shared_state(&self) -> Option<Arc<adk_core::SharedState>> {
1007 self.parent_ctx.shared_state()
1008 }
1009}
1010
1011#[async_trait]
1012impl ToolContext for AgentToolContext {
1013 fn function_call_id(&self) -> &str {
1014 &self.function_call_id
1015 }
1016
1017 fn actions(&self) -> EventActions {
1018 self.actions_guard().clone()
1019 }
1020
1021 fn set_actions(&self, actions: EventActions) {
1022 *self.actions_guard() = actions;
1023 }
1024
1025 async fn search_memory(&self, query: &str) -> Result<Vec<MemoryEntry>> {
1026 if let Some(memory) = self.parent_ctx.memory() {
1028 memory.search(query).await
1029 } else {
1030 Ok(vec![])
1031 }
1032 }
1033
1034 fn user_scopes(&self) -> Vec<String> {
1035 self.parent_ctx.user_scopes()
1036 }
1037
1038 async fn get_secret(&self, name: &str) -> Result<Option<String>> {
1039 self.parent_ctx.get_secret(name).await
1040 }
1041}
1042
1043struct ToolOutcomeCallbackContext {
1047 inner: Arc<dyn CallbackContext>,
1048 outcome: ToolOutcome,
1049}
1050
1051#[async_trait]
1052impl ReadonlyContext for ToolOutcomeCallbackContext {
1053 fn invocation_id(&self) -> &str {
1054 self.inner.invocation_id()
1055 }
1056
1057 fn agent_name(&self) -> &str {
1058 self.inner.agent_name()
1059 }
1060
1061 fn user_id(&self) -> &str {
1062 self.inner.user_id()
1063 }
1064
1065 fn app_name(&self) -> &str {
1066 self.inner.app_name()
1067 }
1068
1069 fn session_id(&self) -> &str {
1070 self.inner.session_id()
1071 }
1072
1073 fn branch(&self) -> &str {
1074 self.inner.branch()
1075 }
1076
1077 fn user_content(&self) -> &Content {
1078 self.inner.user_content()
1079 }
1080}
1081
1082#[async_trait]
1083impl CallbackContext for ToolOutcomeCallbackContext {
1084 fn artifacts(&self) -> Option<Arc<dyn adk_core::Artifacts>> {
1085 self.inner.artifacts()
1086 }
1087
1088 fn tool_outcome(&self) -> Option<ToolOutcome> {
1089 Some(self.outcome.clone())
1090 }
1091}
1092
1093struct CircuitBreakerState {
1103 threshold: u32,
1104 failures: std::collections::HashMap<String, u32>,
1106}
1107
1108impl CircuitBreakerState {
1109 fn new(threshold: u32) -> Self {
1110 Self { threshold, failures: std::collections::HashMap::new() }
1111 }
1112
1113 fn is_open(&self, tool_name: &str) -> bool {
1115 self.failures.get(tool_name).copied().unwrap_or(0) >= self.threshold
1116 }
1117
1118 fn record(&mut self, outcome: &ToolOutcome) {
1120 if outcome.success {
1121 self.failures.remove(&outcome.tool_name);
1122 } else {
1123 let count = self.failures.entry(outcome.tool_name.clone()).or_insert(0);
1124 *count += 1;
1125 }
1126 }
1127}
1128
1129#[async_trait]
1130impl Agent for LlmAgent {
1131 fn name(&self) -> &str {
1132 &self.name
1133 }
1134
1135 fn description(&self) -> &str {
1136 &self.description
1137 }
1138
1139 fn sub_agents(&self) -> &[Arc<dyn Agent>] {
1140 &self.sub_agents
1141 }
1142
1143 #[adk_telemetry::instrument(
1144 skip(self, ctx),
1145 fields(
1146 agent.name = %self.name,
1147 agent.description = %self.description,
1148 invocation.id = %ctx.invocation_id(),
1149 user.id = %ctx.user_id(),
1150 session.id = %ctx.session_id()
1151 )
1152 )]
1153 async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<adk_core::EventStream> {
1154 adk_telemetry::info!("Starting agent execution");
1155 let ctx = Self::apply_input_guardrails(ctx, self.input_guardrails.clone()).await?;
1156
1157 let agent_name = self.name.clone();
1158 let invocation_id = ctx.invocation_id().to_string();
1159 let model = self.model.clone();
1160 let tools = self.tools.clone();
1161 let toolsets = self.toolsets.clone();
1162 let sub_agents = self.sub_agents.clone();
1163
1164 let instruction = self.instruction.clone();
1165 let instruction_provider = self.instruction_provider.clone();
1166 let global_instruction = self.global_instruction.clone();
1167 let global_instruction_provider = self.global_instruction_provider.clone();
1168 let skills_index = self.skills_index.clone();
1169 let skill_policy = self.skill_policy.clone();
1170 let max_skill_chars = self.max_skill_chars;
1171 let output_key = self.output_key.clone();
1172 let output_schema = self.output_schema.clone();
1173 let output_max_retries = self.output_max_retries;
1174 let generate_content_config = self.generate_content_config.clone();
1175 let include_contents = self.include_contents;
1176 let max_iterations = self.max_iterations;
1177 let tool_timeout = self.tool_timeout;
1178 let before_agent_callbacks = self.before_callbacks.clone();
1180 let after_agent_callbacks = self.after_callbacks.clone();
1181 let before_model_callbacks = self.before_model_callbacks.clone();
1182 let after_model_callbacks = self.after_model_callbacks.clone();
1183 let before_tool_callbacks = self.before_tool_callbacks.clone();
1184 let after_tool_callbacks = self.after_tool_callbacks.clone();
1185 let on_tool_error_callbacks = self.on_tool_error_callbacks.clone();
1186 let after_tool_callbacks_full = self.after_tool_callbacks_full.clone();
1187 let default_retry_budget = self.default_retry_budget.clone();
1188 let tool_retry_budgets = self.tool_retry_budgets.clone();
1189 let circuit_breaker_threshold = self.circuit_breaker_threshold;
1190 let tool_confirmation_policy = self.tool_confirmation_policy.clone();
1191 let disallow_transfer_to_parent = self.disallow_transfer_to_parent;
1192 let disallow_transfer_to_peers = self.disallow_transfer_to_peers;
1193 let output_guardrails = self.output_guardrails.clone();
1194 let agent_tool_execution_strategy = self.tool_execution_strategy;
1195 #[cfg(feature = "enhanced-plugins")]
1196 let enhanced_plugin_manager = self.enhanced_plugin_manager.clone();
1197
1198 let s = stream! {
1199 for callback in before_agent_callbacks.as_ref() {
1203 match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
1204 Ok(Some(content)) => {
1205 let mut early_event = Event::new(&invocation_id);
1207 early_event.author = agent_name.clone();
1208 early_event.llm_response.content = Some(content);
1209 yield Ok(early_event);
1210
1211 for after_callback in after_agent_callbacks.as_ref() {
1213 match after_callback(ctx.clone() as Arc<dyn CallbackContext>).await {
1214 Ok(Some(after_content)) => {
1215 let mut after_event = Event::new(&invocation_id);
1216 after_event.author = agent_name.clone();
1217 after_event.llm_response.content = Some(after_content);
1218 yield Ok(after_event);
1219 return;
1220 }
1221 Ok(None) => continue,
1222 Err(e) => {
1223 yield Err(e);
1224 return;
1225 }
1226 }
1227 }
1228 return;
1229 }
1230 Ok(None) => {
1231 continue;
1233 }
1234 Err(e) => {
1235 yield Err(e);
1237 return;
1238 }
1239 }
1240 }
1241
1242 let mut prompt_preamble = Vec::new();
1244
1245 if let Some(index) = &skills_index {
1249 let user_query = ctx
1250 .user_content()
1251 .parts
1252 .iter()
1253 .filter_map(|part| match part {
1254 Part::Text { text } => Some(text.as_str()),
1255 _ => None,
1256 })
1257 .collect::<Vec<_>>()
1258 .join("\n");
1259
1260 if let Some((_matched, skill_block)) = select_skill_prompt_block(
1261 index.as_ref(),
1262 &user_query,
1263 &skill_policy,
1264 max_skill_chars,
1265 ) {
1266 prompt_preamble.push(Content {
1267 role: "user".to_string(),
1268 parts: vec![Part::Text { text: skill_block }],
1269 });
1270 }
1271 }
1272
1273 if let Some(provider) = &global_instruction_provider {
1276 let global_inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
1278 if !global_inst.is_empty() {
1279 prompt_preamble.push(Content {
1280 role: "user".to_string(),
1281 parts: vec![Part::Text { text: global_inst }],
1282 });
1283 }
1284 } else if let Some(ref template) = global_instruction {
1285 let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
1287 if !processed.is_empty() {
1288 prompt_preamble.push(Content {
1289 role: "user".to_string(),
1290 parts: vec![Part::Text { text: processed }],
1291 });
1292 }
1293 }
1294
1295 if let Some(provider) = &instruction_provider {
1298 let inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
1300 if !inst.is_empty() {
1301 prompt_preamble.push(Content {
1302 role: "user".to_string(),
1303 parts: vec![Part::Text { text: inst }],
1304 });
1305 }
1306 } else if let Some(ref template) = instruction {
1307 let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
1309 if !processed.is_empty() {
1310 prompt_preamble.push(Content {
1311 role: "user".to_string(),
1312 parts: vec![Part::Text { text: processed }],
1313 });
1314 }
1315 }
1316
1317 if let Some(ref schema) = output_schema {
1321 let schema_instruction = format!(
1322 "You MUST respond with valid JSON conforming to this schema: {}. Do not include any text outside the JSON object.",
1323 schema
1324 );
1325 prompt_preamble.push(Content {
1326 role: "user".to_string(),
1327 parts: vec![Part::Text { text: schema_instruction }],
1328 });
1329 }
1330
1331 let session_history = if !ctx.run_config().transfer_targets.is_empty() {
1337 ctx.session().conversation_history_for_agent(&agent_name)
1338 } else {
1339 ctx.session().conversation_history()
1340 };
1341 let mut session_history = session_history;
1342 let current_user_content = ctx.user_content().clone();
1343 if let Some(index) = session_history.iter().rposition(|content| content.role == "user") {
1344 session_history[index] = current_user_content.clone();
1345 } else {
1346 session_history.push(current_user_content.clone());
1347 }
1348
1349 let mut conversation_history = match include_contents {
1352 adk_core::IncludeContents::None => {
1353 let mut filtered = prompt_preamble.clone();
1354 filtered.push(current_user_content);
1355 filtered
1356 }
1357 adk_core::IncludeContents::Default => {
1358 let mut full_history = prompt_preamble;
1359 full_history.extend(session_history);
1360 full_history
1361 }
1362 };
1363
1364 let mut resolved_tools: Vec<Arc<dyn Tool>> = tools.clone();
1367 let static_tool_names: std::collections::HashSet<String> =
1368 tools.iter().map(|t| t.name().to_string()).collect();
1369
1370 let mut toolset_source: std::collections::HashMap<String, String> =
1372 std::collections::HashMap::new();
1373
1374 for toolset in &toolsets {
1375 let toolset_tools = match toolset
1376 .tools(ctx.clone() as Arc<dyn ReadonlyContext>)
1377 .await
1378 {
1379 Ok(t) => t,
1380 Err(e) => {
1381 yield Err(e);
1382 return;
1383 }
1384 };
1385 for tool in &toolset_tools {
1386 let name = tool.name().to_string();
1387 if static_tool_names.contains(&name) {
1389 yield Err(adk_core::AdkError::agent(format!(
1390 "Duplicate tool name '{name}': conflict between static tool and toolset '{}'",
1391 toolset.name()
1392 )));
1393 return;
1394 }
1395 if let Some(other_toolset_name) = toolset_source.get(&name) {
1397 yield Err(adk_core::AdkError::agent(format!(
1398 "Duplicate tool name '{name}': conflict between toolset '{}' and toolset '{}'",
1399 other_toolset_name,
1400 toolset.name()
1401 )));
1402 return;
1403 }
1404 toolset_source.insert(name, toolset.name().to_string());
1405 resolved_tools.push(tool.clone());
1406 }
1407 }
1408
1409 let tool_map: std::collections::HashMap<String, Arc<dyn Tool>> = resolved_tools
1411 .iter()
1412 .map(|t| (t.name().to_string(), t.clone()))
1413 .collect();
1414
1415 let collect_long_running_ids = |content: &Content| -> Vec<String> {
1417 content.parts.iter()
1418 .filter_map(|p| {
1419 if let Part::FunctionCall { name, .. } = p
1420 && let Some(tool) = tool_map.get(name)
1421 && tool.is_long_running()
1422 {
1423 return Some(name.clone());
1424 }
1425 None
1426 })
1427 .collect()
1428 };
1429
1430 let mut tool_declarations = std::collections::HashMap::new();
1435 for tool in &resolved_tools {
1436 tool_declarations.insert(tool.name().to_string(), tool.declaration());
1437 }
1438
1439 let mut valid_transfer_targets: Vec<String> = sub_agents
1444 .iter()
1445 .map(|a| a.name().to_string())
1446 .collect();
1447
1448 let run_config_targets = &ctx.run_config().transfer_targets;
1450 let parent_agent_name = ctx.run_config().parent_agent.clone();
1451 let sub_agent_names: std::collections::HashSet<&str> = sub_agents
1452 .iter()
1453 .map(|a| a.name())
1454 .collect();
1455
1456 for target in run_config_targets {
1457 if sub_agent_names.contains(target.as_str()) {
1459 continue;
1460 }
1461
1462 let is_parent = parent_agent_name.as_deref() == Some(target.as_str());
1464 if is_parent && disallow_transfer_to_parent {
1465 continue;
1466 }
1467 if !is_parent && disallow_transfer_to_peers {
1468 continue;
1469 }
1470
1471 valid_transfer_targets.push(target.clone());
1472 }
1473
1474 if !valid_transfer_targets.is_empty() {
1476 let transfer_tool_name = "transfer_to_agent";
1477 let transfer_tool_decl = serde_json::json!({
1478 "name": transfer_tool_name,
1479 "description": format!(
1480 "Transfer execution to another agent. Valid targets: {}",
1481 valid_transfer_targets.join(", ")
1482 ),
1483 "parameters": {
1484 "type": "object",
1485 "properties": {
1486 "agent_name": {
1487 "type": "string",
1488 "description": "The name of the agent to transfer to.",
1489 "enum": valid_transfer_targets
1490 }
1491 },
1492 "required": ["agent_name"]
1493 }
1494 });
1495 tool_declarations.insert(transfer_tool_name.to_string(), transfer_tool_decl);
1496 }
1497
1498
1499 let mut circuit_breaker_state = circuit_breaker_threshold.map(CircuitBreakerState::new);
1502
1503 let mut last_interaction_id: Option<String> = None;
1512
1513 let mut iteration = 0;
1515 let mut schema_retry_count: usize = 0;
1516
1517 loop {
1518 iteration += 1;
1519 if iteration > max_iterations {
1520 yield Err(adk_core::AdkError::agent(
1521 format!("Max iterations ({max_iterations}) exceeded")
1522 ));
1523 return;
1524 }
1525
1526 let config = match (&generate_content_config, &output_schema) {
1533 (Some(base), Some(schema)) => {
1534 let mut merged = base.clone();
1535 merged.response_schema = Some(schema.clone());
1536 Some(merged)
1537 }
1538 (Some(base), None) => Some(base.clone()),
1539 (None, Some(schema)) => Some(adk_core::GenerateContentConfig {
1540 response_schema: Some(schema.clone()),
1541 ..Default::default()
1542 }),
1543 (None, None) => None,
1544 };
1545
1546 let config = if let Some(ref cached) = ctx.run_config().cached_content {
1548 let mut cfg = config.unwrap_or_default();
1549 if cfg.cached_content.is_none() {
1551 cfg.cached_content = Some(cached.clone());
1552 }
1553 Some(cfg)
1554 } else {
1555 config
1556 };
1557
1558 let request = LlmRequest {
1559 model: model.name().to_string(),
1560 contents: conversation_history.clone(),
1561 tools: tool_declarations.clone(),
1562 config,
1563 previous_response_id: last_interaction_id.clone(),
1570 };
1571
1572 #[cfg(feature = "enhanced-plugins")]
1576 let (request, model_response_override_from_plugin) = {
1577 if let Some(epm) = &enhanced_plugin_manager {
1578 match epm.run_before_model_call(request, ctx.clone() as Arc<dyn CallbackContext>).await {
1579 Ok(BeforeModelCallResult::Continue(modified_request)) => {
1580 (modified_request, None)
1581 }
1582 Ok(BeforeModelCallResult::ShortCircuit(response)) => {
1583 (LlmRequest::new("", vec![]), Some(response))
1585 }
1586 Err(e) => {
1587 yield Err(e);
1588 return;
1589 }
1590 }
1591 } else {
1592 (request, None)
1593 }
1594 };
1595 #[cfg(not(feature = "enhanced-plugins"))]
1596 let model_response_override_from_plugin: Option<LlmResponse> = None;
1597
1598 let mut current_request = request;
1601 let mut model_response_override = model_response_override_from_plugin;
1602 if model_response_override.is_none() {
1603 for callback in before_model_callbacks.as_ref() {
1604 match callback(ctx.clone() as Arc<dyn CallbackContext>, current_request.clone()).await {
1605 Ok(BeforeModelResult::Continue(modified_request)) => {
1606 current_request = modified_request;
1608 }
1609 Ok(BeforeModelResult::Skip(response)) => {
1610 model_response_override = Some(response);
1612 break;
1613 }
1614 Err(e) => {
1615 yield Err(e);
1617 return;
1618 }
1619 }
1620 }
1621 }
1622 let request = current_request;
1623
1624 let mut accumulated_content: Option<Content> = None;
1626 let mut final_provider_metadata: Option<serde_json::Value> = None;
1627
1628 if let Some(cached_response) = model_response_override {
1629 accumulated_content = cached_response.content.clone();
1632 final_provider_metadata = cached_response.provider_metadata.clone();
1633 normalize_option_content(&mut accumulated_content);
1634 if let Some(content) = accumulated_content.take() {
1635 let has_function_calls = content
1636 .parts
1637 .iter()
1638 .any(|part| matches!(part, Part::FunctionCall { .. }));
1639 let content = if has_function_calls {
1640 content
1641 } else {
1642 Self::apply_output_guardrails(output_guardrails.as_ref(), content).await?
1643 };
1644 accumulated_content = Some(content);
1645 }
1646
1647 let mut cached_event = Event::new(&invocation_id);
1648 cached_event.author = agent_name.clone();
1649 cached_event.llm_response.content = accumulated_content.clone();
1650 cached_event.llm_response.provider_metadata = cached_response.provider_metadata.clone();
1651 cached_event.llm_response.interaction_id = cached_response.interaction_id.clone();
1653 if cached_response.interaction_id.is_some() {
1654 last_interaction_id = cached_response.interaction_id.clone();
1655 }
1656 cached_event.llm_request = Some(serde_json::to_string(&request).unwrap_or_default());
1657 cached_event.provider_metadata.insert("gcp.vertex.agent.llm_request".to_string(), serde_json::to_string(&request).unwrap_or_default());
1658 cached_event.provider_metadata.insert("gcp.vertex.agent.llm_response".to_string(), serde_json::to_string(&cached_response).unwrap_or_default());
1659
1660 if let Some(ref content) = accumulated_content {
1662 cached_event.long_running_tool_ids = collect_long_running_ids(content);
1663 }
1664
1665 yield Ok(cached_event);
1666 } else {
1667 let request_json = serde_json::to_string(&request).unwrap_or_default();
1669 let trace_request_json = trace_json_payload(
1670 &request,
1671 ctx.run_config().record_payloads,
1672 ctx.run_config().trace_payload_max_bytes,
1673 );
1674
1675 let llm_ts = std::time::SystemTime::now()
1677 .duration_since(std::time::UNIX_EPOCH)
1678 .unwrap_or_default()
1679 .as_nanos();
1680 let llm_event_id = format!("{}_llm_{}", invocation_id, llm_ts);
1681 let llm_span = tracing::info_span!(
1682 "call_llm",
1683 "gcp.vertex.agent.event_id" = %llm_event_id,
1684 "gcp.vertex.agent.invocation_id" = %invocation_id,
1685 "gcp.vertex.agent.session_id" = %ctx.session_id(),
1686 "gen_ai.conversation.id" = %ctx.session_id(),
1687 "gcp.vertex.agent.llm_request" = %trace_request_json,
1688 "gcp.vertex.agent.llm_response" = tracing::field::Empty );
1690 let _llm_guard = llm_span.enter();
1691
1692 use adk_core::StreamingMode;
1694 let streaming_mode = ctx.run_config().streaming_mode;
1695 let should_stream_to_client = matches!(streaming_mode, StreamingMode::SSE | StreamingMode::Bidi)
1696 && output_guardrails.is_empty();
1697
1698 let mut response_stream = model.generate_content(request, true).await?;
1700
1701 use futures::StreamExt;
1702
1703 let mut last_chunk: Option<LlmResponse> = None;
1705
1706 while let Some(chunk_result) = response_stream.next().await {
1708 let mut chunk = match chunk_result {
1709 Ok(c) => c,
1710 Err(e) => {
1711 yield Err(e);
1712 return;
1713 }
1714 };
1715
1716 for callback in after_model_callbacks.as_ref() {
1719 match callback(ctx.clone() as Arc<dyn CallbackContext>, chunk.clone()).await {
1720 Ok(Some(modified_chunk)) => {
1721 chunk = modified_chunk;
1723 break;
1724 }
1725 Ok(None) => {
1726 continue;
1728 }
1729 Err(e) => {
1730 yield Err(e);
1732 return;
1733 }
1734 }
1735 }
1736
1737 normalize_option_content(&mut chunk.content);
1738
1739 if let Some(chunk_content) = chunk.content.clone() {
1741 if let Some(ref mut acc) = accumulated_content {
1742 acc.parts.extend(chunk_content.parts);
1743 } else {
1744 accumulated_content = Some(chunk_content);
1745 }
1746 }
1747
1748 if should_stream_to_client {
1750 let mut partial_event = Event::with_id(&llm_event_id, &invocation_id);
1751 partial_event.author = agent_name.clone();
1752 partial_event.llm_request = Some(request_json.clone());
1753 partial_event.provider_metadata.insert("gcp.vertex.agent.llm_request".to_string(), request_json.clone());
1754 partial_event.provider_metadata.insert("gcp.vertex.agent.llm_response".to_string(), serde_json::to_string(&chunk).unwrap_or_default());
1755 partial_event.llm_response.partial = chunk.partial;
1756 partial_event.llm_response.turn_complete = chunk.turn_complete;
1757 partial_event.llm_response.finish_reason = chunk.finish_reason;
1758 partial_event.llm_response.usage_metadata = chunk.usage_metadata.clone();
1759 partial_event.llm_response.content = chunk.content.clone();
1760 partial_event.llm_response.provider_metadata = chunk.provider_metadata.clone();
1761 partial_event.llm_response.interaction_id = chunk.interaction_id.clone();
1762
1763 if let Some(ref content) = chunk.content {
1765 partial_event.long_running_tool_ids = collect_long_running_ids(content);
1766 }
1767
1768 yield Ok(partial_event);
1769 }
1770
1771 if chunk.interaction_id.is_some() {
1775 last_interaction_id = chunk.interaction_id.clone();
1776 }
1777
1778 last_chunk = Some(chunk.clone());
1780
1781 if chunk.turn_complete {
1783 break;
1784 }
1785 }
1786
1787 if !should_stream_to_client {
1789 if let Some(content) = accumulated_content.take() {
1790 let has_function_calls = content
1791 .parts
1792 .iter()
1793 .any(|part| matches!(part, Part::FunctionCall { .. }));
1794 let content = if has_function_calls {
1795 content
1796 } else {
1797 Self::apply_output_guardrails(output_guardrails.as_ref(), content).await?
1798 };
1799 accumulated_content = Some(content);
1800 }
1801
1802 let mut final_event = Event::with_id(&llm_event_id, &invocation_id);
1803 final_event.author = agent_name.clone();
1804 final_event.llm_request = Some(request_json.clone());
1805 final_event.provider_metadata.insert("gcp.vertex.agent.llm_request".to_string(), request_json.clone());
1806 final_event.llm_response.content = accumulated_content.clone();
1807 final_event.llm_response.partial = false;
1808 final_event.llm_response.turn_complete = true;
1809
1810 if let Some(ref last) = last_chunk {
1812 final_event.llm_response.finish_reason = last.finish_reason;
1813 final_event.llm_response.usage_metadata = last.usage_metadata.clone();
1814 final_event.llm_response.provider_metadata = last.provider_metadata.clone();
1815 final_event.llm_response.interaction_id = last.interaction_id.clone();
1816 final_provider_metadata = last.provider_metadata.clone();
1817 final_event.provider_metadata.insert("gcp.vertex.agent.llm_response".to_string(), serde_json::to_string(last).unwrap_or_default());
1818 }
1819
1820 if let Some(ref content) = accumulated_content {
1822 final_event.long_running_tool_ids = collect_long_running_ids(content);
1823 }
1824
1825 yield Ok(final_event);
1826 }
1827
1828 if let Some(ref content) = accumulated_content {
1830 let response_json = trace_json_payload(
1831 content,
1832 ctx.run_config().record_payloads,
1833 ctx.run_config().trace_payload_max_bytes,
1834 );
1835 llm_span.record("gcp.vertex.agent.llm_response", &response_json);
1836 }
1837 }
1838
1839 #[cfg(feature = "enhanced-plugins")]
1843 if let Some(epm) = &enhanced_plugin_manager {
1844 if let Some(ref content) = accumulated_content {
1845 let response_for_hook = LlmResponse {
1846 content: Some(content.clone()),
1847 provider_metadata: final_provider_metadata.clone(),
1848 ..Default::default()
1849 };
1850 match epm.run_after_model_call(response_for_hook, ctx.clone() as Arc<dyn CallbackContext>).await {
1851 Ok(adk_plugin::AfterModelCallResult::Continue(modified_response)) => {
1852 accumulated_content = modified_response.content;
1853 if modified_response.provider_metadata.is_some() {
1854 final_provider_metadata = modified_response.provider_metadata;
1855 }
1856 }
1857 Err(e) => {
1858 yield Err(e);
1859 return;
1860 }
1861 }
1862 }
1863 }
1864
1865 let function_call_names: Vec<String> = accumulated_content.as_ref()
1867 .map(|c| c.parts.iter()
1868 .filter_map(|p| {
1869 if let Part::FunctionCall { name, .. } = p {
1870 Some(name.clone())
1871 } else {
1872 None
1873 }
1874 })
1875 .collect())
1876 .unwrap_or_default();
1877
1878 let has_function_calls = !function_call_names.is_empty();
1879
1880 let all_calls_are_long_running = has_function_calls && function_call_names.iter().all(|name| {
1884 tool_map.get(name)
1885 .map(|t| t.is_long_running())
1886 .unwrap_or(false)
1887 });
1888
1889 if let Some(ref content) = accumulated_content {
1891 conversation_history.push(Self::augment_content_for_history(
1892 content,
1893 final_provider_metadata.as_ref(),
1894 ));
1895
1896 if let Some(ref output_key) = output_key
1898 && !has_function_calls
1899 {
1900 let mut text_parts = String::new();
1901 for part in &content.parts {
1902 if let Part::Text { text } = part {
1903 text_parts.push_str(text);
1904 }
1905 }
1906 if !text_parts.is_empty() {
1907 let mut state_event = Event::new(&invocation_id);
1909 state_event.author = agent_name.clone();
1910 state_event.actions.state_delta.insert(
1911 output_key.clone(),
1912 serde_json::Value::String(text_parts),
1913 );
1914 yield Ok(state_event);
1915 }
1916 }
1917 }
1918
1919 if !has_function_calls {
1920 if let Some(ref schema) = output_schema {
1925 let text = accumulated_content
1926 .as_ref()
1927 .map(|c| {
1928 c.parts
1929 .iter()
1930 .filter_map(|p| {
1931 if let Part::Text { text } = p {
1932 Some(text.as_str())
1933 } else {
1934 None
1935 }
1936 })
1937 .collect::<Vec<_>>()
1938 .join("")
1939 })
1940 .unwrap_or_default();
1941
1942 if !text.is_empty()
1943 && let Err(validation_error) = validate_output_against_schema(&text, schema)
1944 {
1945 if schema_retry_count >= output_max_retries {
1946 yield Err(adk_core::AdkError::agent(format!(
1947 "output schema validation failed after {} attempts",
1948 output_max_retries
1949 )));
1950 return;
1951 }
1952 schema_retry_count += 1;
1953
1954 let correction = format!(
1956 "Your output did not match the required schema. Error: {}. Please produce valid JSON matching the schema.",
1957 validation_error
1958 );
1959 conversation_history.push(Content {
1960 role: "user".to_string(),
1961 parts: vec![Part::Text { text: correction }],
1962 });
1963 continue;
1964 }
1965 }
1966
1967 if let Some(ref content) = accumulated_content {
1970 let response_json = trace_json_payload(
1971 content,
1972 ctx.run_config().record_payloads,
1973 ctx.run_config().trace_payload_max_bytes,
1974 );
1975 tracing::Span::current().record("gcp.vertex.agent.llm_response", &response_json);
1976 }
1977
1978 tracing::info!(agent.name = %agent_name, "Agent execution complete");
1979 break;
1980 }
1981
1982 if let Some(content) = &accumulated_content {
1984 let strategy = agent_tool_execution_strategy
1987 .unwrap_or(ToolExecutionStrategy::Sequential);
1988
1989 let mut fc_parts: Vec<(usize, String, serde_json::Value, Option<String>, String)> = Vec::new();
1993 {
1994 let mut tci = 0usize;
1995 for part in &content.parts {
1996 if let Part::FunctionCall { name, args, id, .. } = part {
1997 let fallback = format!("{}_{}_{}", invocation_id, name, tci);
1998 let fcid = id.clone().unwrap_or(fallback);
1999 fc_parts.push((tci, name.clone(), args.clone(), id.clone(), fcid));
2000 tci += 1;
2001 }
2002 }
2003 }
2004
2005 let mut transfer_handled = false;
2009 for (_, fc_name, fc_args, fc_id, _) in &fc_parts {
2010 if fc_name == "transfer_to_agent" {
2011 let target_agent = fc_args.get("agent_name")
2012 .and_then(|v| v.as_str())
2013 .unwrap_or_default()
2014 .to_string();
2015
2016 let valid_target = valid_transfer_targets.iter().any(|n| n == &target_agent);
2017 if !valid_target {
2018 let error_content = Content {
2019 role: "function".to_string(),
2020 parts: vec![Part::FunctionResponse {
2021 function_response: FunctionResponseData::new(
2022 fc_name.clone(),
2023 serde_json::json!({
2024 "error": format!(
2025 "Agent '{}' not found. Available agents: {:?}",
2026 target_agent, valid_transfer_targets
2027 )
2028 }),
2029 ),
2030 id: fc_id.clone(),
2031 }],
2032 };
2033 conversation_history.push(error_content.clone());
2034 let mut error_event = Event::new(&invocation_id);
2035 error_event.author = agent_name.clone();
2036 error_event.llm_response.content = Some(error_content);
2037 yield Ok(error_event);
2038 continue;
2039 }
2040
2041 let mut transfer_event = Event::new(&invocation_id);
2042 transfer_event.author = agent_name.clone();
2043 transfer_event.actions.transfer_to_agent = Some(target_agent);
2044 yield Ok(transfer_event);
2045 transfer_handled = true;
2046 break;
2047 }
2048 }
2049 if transfer_handled {
2050 return;
2051 }
2052
2053 let fc_parts: Vec<_> = fc_parts.into_iter().filter(|(_, fc_name, _, _, _)| {
2055 if fc_name == "transfer_to_agent" {
2056 return false;
2057 }
2058 if let Some(tool) = tool_map.get(fc_name)
2059 && tool.is_builtin()
2060 {
2061 adk_telemetry::debug!(tool.name = %fc_name, "skipping built-in tool execution");
2062 return false;
2063 }
2064 true
2065 }).collect();
2066
2067 let mut confirmation_interrupted = false;
2071 for (_, fc_name, fc_args, _, fc_call_id) in &fc_parts {
2072 if tool_confirmation_policy.requires_confirmation(fc_name)
2073 && ctx.run_config().tool_confirmation_decisions.get(fc_name).copied().is_none()
2074 {
2075 let mut ce = Event::new(&invocation_id);
2076 ce.author = agent_name.clone();
2077 ce.llm_response.interrupted = true;
2078 ce.llm_response.turn_complete = true;
2079 ce.llm_response.content = Some(Content {
2080 role: "model".to_string(),
2081 parts: vec![Part::Text {
2082 text: format!(
2083 "Tool confirmation required for '{}'. Provide approve/deny decision to continue.",
2084 fc_name
2085 ),
2086 }],
2087 });
2088 ce.actions.tool_confirmation = Some(ToolConfirmationRequest {
2089 tool_name: fc_name.clone(),
2090 function_call_id: Some(fc_call_id.clone()),
2091 args: fc_args.clone(),
2092 });
2093 yield Ok(ce);
2094 confirmation_interrupted = true;
2095 break;
2096 }
2097 }
2098 if confirmation_interrupted {
2099 return;
2100 }
2101
2102 let cb_mutex = std::sync::Mutex::new(circuit_breaker_state.take());
2104
2105 let concurrency_manager = adk_core::ToolConcurrencyManager::new(
2108 &ctx.run_config().tool_concurrency,
2109 );
2110
2111 let execute_one_tool = |idx: usize, name: String, args: serde_json::Value,
2116 id: Option<String>, function_call_id: String| {
2117 let ctx = ctx.clone();
2118 let tool_map = &tool_map;
2119 let tool_retry_budgets = &tool_retry_budgets;
2120 let default_retry_budget = &default_retry_budget;
2121 let before_tool_callbacks = &before_tool_callbacks;
2122 let after_tool_callbacks = &after_tool_callbacks;
2123 let after_tool_callbacks_full = &after_tool_callbacks_full;
2124 let on_tool_error_callbacks = &on_tool_error_callbacks;
2125 let tool_confirmation_policy = &tool_confirmation_policy;
2126 let cb_mutex = &cb_mutex;
2127 let invocation_id = &invocation_id;
2128 let concurrency_manager = &concurrency_manager;
2129 #[cfg(feature = "enhanced-plugins")]
2130 let enhanced_plugin_manager = &enhanced_plugin_manager;
2131 async move {
2132 let mut tool_actions = EventActions::default();
2133 let mut response_content: Option<Content> = None;
2134 let mut run_after_tool_callbacks = true;
2135 let mut tool_outcome_for_callback: Option<ToolOutcome> = None;
2136 let mut executed_tool: Option<Arc<dyn Tool>> = None;
2137 let mut executed_tool_response: Option<serde_json::Value> = None;
2138
2139 let _concurrency_permit = match concurrency_manager.acquire(&name).await {
2143 Ok(permit) => Some(permit),
2144 Err(e) => {
2145 let error_content = Content {
2147 role: "function".to_string(),
2148 parts: vec![Part::FunctionResponse {
2149 function_response: FunctionResponseData::new(
2150 name.clone(),
2151 serde_json::json!({ "error": e.to_string() }),
2152 ),
2153 id: id.clone(),
2154 }],
2155 };
2156 return (idx, error_content, tool_actions, false);
2157 }
2158 };
2159
2160 if tool_confirmation_policy.requires_confirmation(&name) {
2162 match ctx.run_config().tool_confirmation_decisions.get(&name).copied() {
2163 Some(ToolConfirmationDecision::Approve) => {
2164 tool_actions.tool_confirmation_decision =
2165 Some(ToolConfirmationDecision::Approve);
2166 }
2167 Some(ToolConfirmationDecision::Deny) => {
2168 tool_actions.tool_confirmation_decision =
2169 Some(ToolConfirmationDecision::Deny);
2170 response_content = Some(Content {
2171 role: "function".to_string(),
2172 parts: vec![Part::FunctionResponse {
2173 function_response: FunctionResponseData::new(
2174 name.clone(),
2175 serde_json::json!({
2176 "error": format!("Tool '{}' execution denied by confirmation policy", name)
2177 }),
2178 ),
2179 id: id.clone(),
2180 }],
2181 });
2182 run_after_tool_callbacks = false;
2183 }
2184 None => {
2185 response_content = Some(Content {
2186 role: "function".to_string(),
2187 parts: vec![Part::FunctionResponse {
2188 function_response: FunctionResponseData::new(
2189 name.clone(),
2190 serde_json::json!({
2191 "error": format!("Tool '{}' requires confirmation", name)
2192 }),
2193 ),
2194 id: id.clone(),
2195 }],
2196 });
2197 run_after_tool_callbacks = false;
2198 }
2199 }
2200 }
2201
2202 #[allow(unused_mut)]
2205 let mut final_args = args.clone();
2206
2207 #[cfg(feature = "enhanced-plugins")]
2209 if response_content.is_none() {
2210 if let Some(epm) = &enhanced_plugin_manager {
2211 if let Some(tool_ref) = tool_map.get(&name) {
2212 match epm.run_before_tool_call(
2213 tool_ref.clone(),
2214 final_args.clone(),
2215 ctx.clone() as Arc<dyn CallbackContext>,
2216 ).await {
2217 Ok(BeforeToolCallResult::Continue(modified_args)) => {
2218 final_args = modified_args;
2219 }
2220 Ok(BeforeToolCallResult::ShortCircuit(synthetic_result)) => {
2221 response_content = Some(Content {
2223 role: "function".to_string(),
2224 parts: vec![Part::FunctionResponse {
2225 function_response: FunctionResponseData::from_tool_result(
2226 name.clone(),
2227 synthetic_result,
2228 ),
2229 id: id.clone(),
2230 }],
2231 });
2232 executed_tool = Some(tool_ref.clone());
2233 }
2234 Err(e) => {
2235 response_content = Some(Content {
2236 role: "function".to_string(),
2237 parts: vec![Part::FunctionResponse {
2238 function_response: FunctionResponseData::new(
2239 name.clone(),
2240 serde_json::json!({ "error": e.to_string() }),
2241 ),
2242 id: id.clone(),
2243 }],
2244 });
2245 run_after_tool_callbacks = false;
2246 }
2247 }
2248 }
2249 }
2250 }
2251
2252 if response_content.is_none() {
2253 let tool_ctx = Arc::new(ToolCallbackContext::new(
2254 ctx.clone(),
2255 name.clone(),
2256 final_args.clone(),
2257 ));
2258 for callback in before_tool_callbacks.as_ref() {
2259 match callback(tool_ctx.clone() as Arc<dyn CallbackContext>).await {
2260 Ok(Some(c)) => { response_content = Some(c); break; }
2261 Ok(None) => continue,
2262 Err(e) => {
2263 response_content = Some(Content {
2264 role: "function".to_string(),
2265 parts: vec![Part::FunctionResponse {
2266 function_response: FunctionResponseData::new(
2267 name.clone(),
2268 serde_json::json!({ "error": e.to_string() }),
2269 ),
2270 id: id.clone(),
2271 }],
2272 });
2273 run_after_tool_callbacks = false;
2274 break;
2275 }
2276 }
2277 }
2278 }
2279
2280 if response_content.is_none() {
2282 let guard = cb_mutex.lock().unwrap_or_else(|e| e.into_inner());
2283 if let Some(ref cb_state) = *guard
2284 && cb_state.is_open(&name)
2285 {
2286 let msg = format!(
2287 "Tool '{}' is temporarily disabled after {} consecutive failures",
2288 name, cb_state.threshold
2289 );
2290 tracing::warn!(tool.name = %name, "circuit breaker open, skipping tool execution");
2291 response_content = Some(Content {
2292 role: "function".to_string(),
2293 parts: vec![Part::FunctionResponse {
2294 function_response: FunctionResponseData::new(
2295 name.clone(),
2296 serde_json::json!({ "error": msg }),
2297 ),
2298 id: id.clone(),
2299 }],
2300 });
2301 run_after_tool_callbacks = false;
2302 }
2303 drop(guard);
2304 }
2305
2306 if response_content.is_none() {
2308 if let Some(tool) = tool_map.get(&name) {
2309 let tool_ctx: Arc<dyn ToolContext> = Arc::new(
2310 AgentToolContext::new(ctx.clone(), function_call_id.clone()),
2311 );
2312 let span_name = format!("execute_tool {name}");
2313 let tool_span = tracing::info_span!(
2314 "",
2315 otel.name = %span_name,
2316 tool.name = %name,
2317 "gcp.vertex.agent.event_id" = %format!("{}_{}", invocation_id, name),
2318 "gcp.vertex.agent.invocation_id" = %invocation_id,
2319 "gcp.vertex.agent.session_id" = %ctx.session_id(),
2320 "gen_ai.conversation.id" = %ctx.session_id()
2321 );
2322
2323 let budget = tool_retry_budgets.get(&name)
2324 .or(default_retry_budget.as_ref());
2325 let max_attempts = budget.map(|b| b.max_retries + 1).unwrap_or(1);
2326 let retry_delay = budget.map(|b| b.delay).unwrap_or_default();
2327
2328 let tool_clone = tool.clone();
2329 let tool_start = std::time::Instant::now();
2330 let mut last_error = String::new();
2331 let mut final_attempt: u32 = 0;
2332 let mut retry_result: Option<serde_json::Value> = None;
2333
2334 for attempt in 0..max_attempts {
2335 final_attempt = attempt;
2336 if attempt > 0 {
2337 tokio::time::sleep(retry_delay).await;
2338 }
2339 match async {
2340 let args_payload = trace_json_payload(
2341 &final_args,
2342 ctx.run_config().record_payloads,
2343 ctx.run_config().trace_payload_max_bytes,
2344 );
2345 tracing::debug!(tool.name = %name, tool.args = %args_payload, attempt = attempt, "tool_call");
2346 let exec_future = tool_clone.execute(tool_ctx.clone(), final_args.clone());
2347 let unwind_safe_future = std::panic::AssertUnwindSafe(
2348 tokio::time::timeout(tool_timeout, exec_future)
2349 );
2350 match futures::FutureExt::catch_unwind(unwind_safe_future).await {
2351 Ok(result) => result,
2352 Err(_panic) => Ok(Err(adk_core::AdkError::tool(
2353 format!("tool '{}' panicked during execution", name),
2354 ))),
2355 }
2356 }.instrument(tool_span.clone()).await {
2357 Ok(Ok(value)) => {
2358 let result_payload = trace_json_payload(
2359 &value,
2360 ctx.run_config().record_payloads,
2361 ctx.run_config().trace_payload_max_bytes,
2362 );
2363 tracing::debug!(tool.name = %name, tool.result = %result_payload, "tool_result");
2364 retry_result = Some(value);
2365 break;
2366 }
2367 Ok(Err(e)) => {
2368 last_error = e.to_string();
2369 if attempt + 1 < max_attempts {
2370 tracing::warn!(tool.name = %name, attempt = attempt, error = %last_error, "tool execution failed, retrying");
2371 } else {
2372 tracing::warn!(tool.name = %name, error = %last_error, "tool_error");
2373 }
2374 }
2375 Err(_) => {
2376 last_error = format!(
2377 "Tool '{}' timed out after {} seconds",
2378 name, tool_timeout.as_secs()
2379 );
2380 if attempt + 1 < max_attempts {
2381 tracing::warn!(tool.name = %name, attempt = attempt, timeout_secs = tool_timeout.as_secs(), "tool timed out, retrying");
2382 } else {
2383 tracing::warn!(tool.name = %name, timeout_secs = tool_timeout.as_secs(), "tool_timeout");
2384 }
2385 }
2386 }
2387 }
2388
2389 let tool_duration = tool_start.elapsed();
2390 let (tool_success, tool_error_message, function_response) = match retry_result {
2391 Some(value) => (true, None, value),
2392 None => (false, Some(last_error.clone()), serde_json::json!({ "error": last_error })),
2393 };
2394
2395 let outcome = ToolOutcome {
2396 tool_name: name.clone(),
2397 tool_args: final_args.clone(),
2398 success: tool_success,
2399 duration: tool_duration,
2400 error_message: tool_error_message.clone(),
2401 attempt: final_attempt,
2402 };
2403 tool_outcome_for_callback = Some(outcome);
2404
2405 {
2407 let mut guard = cb_mutex.lock().unwrap_or_else(|e| e.into_inner());
2408 if let Some(ref mut cb_state) = *guard {
2409 cb_state.record(tool_outcome_for_callback.as_ref().unwrap());
2410 }
2411 }
2412
2413 let final_function_response = if !tool_success {
2415 let mut fallback_result = None;
2416 let error_msg = tool_error_message.clone().unwrap_or_default();
2417 for callback in on_tool_error_callbacks.as_ref() {
2418 match callback(
2419 ctx.clone() as Arc<dyn CallbackContext>,
2420 tool.clone(),
2421 final_args.clone(),
2422 error_msg.clone(),
2423 ).await {
2424 Ok(Some(result)) => { fallback_result = Some(result); break; }
2425 Ok(None) => continue,
2426 Err(e) => { tracing::warn!(error = %e, "on_tool_error callback failed"); break; }
2427 }
2428 }
2429 fallback_result.unwrap_or(function_response)
2430 } else {
2431 function_response
2432 };
2433
2434 let confirmation_decision = tool_actions.tool_confirmation_decision;
2435 tool_actions = tool_ctx.actions();
2436 if tool_actions.tool_confirmation_decision.is_none() {
2437 tool_actions.tool_confirmation_decision = confirmation_decision;
2438 }
2439 executed_tool = Some(tool.clone());
2440 executed_tool_response = Some(final_function_response.clone());
2441 response_content = Some(Content {
2442 role: "function".to_string(),
2443 parts: vec![Part::FunctionResponse {
2444 function_response: FunctionResponseData::from_tool_result(
2445 name.clone(),
2446 final_function_response,
2447 ),
2448 id: id.clone(),
2449 }],
2450 });
2451 } else {
2452 response_content = Some(Content {
2453 role: "function".to_string(),
2454 parts: vec![Part::FunctionResponse {
2455 function_response: FunctionResponseData::new(
2456 name.clone(),
2457 serde_json::json!({
2458 "error": format!("Tool {} not found", name)
2459 }),
2460 ),
2461 id: id.clone(),
2462 }],
2463 });
2464 }
2465 }
2466
2467 let mut response_content = response_content.expect("tool response content is set");
2469 if run_after_tool_callbacks {
2470 let outcome_ctx: Arc<dyn CallbackContext> = match tool_outcome_for_callback {
2471 Some(outcome) => Arc::new(ToolOutcomeCallbackContext {
2472 inner: ctx.clone() as Arc<dyn CallbackContext>,
2473 outcome,
2474 }),
2475 None => ctx.clone() as Arc<dyn CallbackContext>,
2476 };
2477 let cb_ctx: Arc<dyn CallbackContext> = Arc::new(ToolCallbackContext::new(
2478 outcome_ctx,
2479 name.clone(),
2480 final_args.clone(),
2481 ));
2482 for callback in after_tool_callbacks.as_ref() {
2483 match callback(cb_ctx.clone()).await {
2484 Ok(Some(modified)) => { response_content = modified; break; }
2485 Ok(None) => continue,
2486 Err(e) => {
2487 response_content = Content {
2488 role: "function".to_string(),
2489 parts: vec![Part::FunctionResponse {
2490 function_response: FunctionResponseData::new(
2491 name.clone(),
2492 serde_json::json!({ "error": e.to_string() }),
2493 ),
2494 id: id.clone(),
2495 }],
2496 };
2497 break;
2498 }
2499 }
2500 }
2501 if let (Some(tool_ref), Some(tool_resp)) = (&executed_tool, executed_tool_response) {
2502 for callback in after_tool_callbacks_full.as_ref() {
2503 match callback(
2504 cb_ctx.clone(), tool_ref.clone(), final_args.clone(), tool_resp.clone(),
2505 ).await {
2506 Ok(Some(modified_value)) => {
2507 response_content = Content {
2508 role: "function".to_string(),
2509 parts: vec![Part::FunctionResponse {
2510 function_response: FunctionResponseData::from_tool_result(
2511 name.clone(),
2512 modified_value,
2513 ),
2514 id: id.clone(),
2515 }],
2516 };
2517 break;
2518 }
2519 Ok(None) => continue,
2520 Err(e) => {
2521 response_content = Content {
2522 role: "function".to_string(),
2523 parts: vec![Part::FunctionResponse {
2524 function_response: FunctionResponseData::new(
2525 name.clone(),
2526 serde_json::json!({ "error": e.to_string() }),
2527 ),
2528 id: id.clone(),
2529 }],
2530 };
2531 break;
2532 }
2533 }
2534 }
2535 }
2536
2537 #[cfg(feature = "enhanced-plugins")]
2540 if let Some(epm) = &enhanced_plugin_manager {
2541 if let Some(tool_ref) = &executed_tool {
2542 let result_value = response_content.parts.iter()
2544 .find_map(|p| {
2545 if let Part::FunctionResponse { function_response, .. } = p {
2546 Some(function_response.response.clone())
2547 } else {
2548 None
2549 }
2550 })
2551 .unwrap_or(serde_json::json!(null));
2552
2553 match epm.run_after_tool_call(
2554 tool_ref.clone(),
2555 &final_args,
2556 result_value,
2557 ctx.clone() as Arc<dyn CallbackContext>,
2558 ).await {
2559 Ok(adk_plugin::AfterToolCallResult::Continue(modified_result)) => {
2560 response_content = Content {
2561 role: "function".to_string(),
2562 parts: vec![Part::FunctionResponse {
2563 function_response: FunctionResponseData::from_tool_result(
2564 name.clone(),
2565 modified_result,
2566 ),
2567 id: id.clone(),
2568 }],
2569 };
2570 }
2571 Err(e) => {
2572 response_content = Content {
2573 role: "function".to_string(),
2574 parts: vec![Part::FunctionResponse {
2575 function_response: FunctionResponseData::new(
2576 name.clone(),
2577 serde_json::json!({ "error": e.to_string() }),
2578 ),
2579 id: id.clone(),
2580 }],
2581 };
2582 }
2583 }
2584 }
2585 }
2586 }
2587
2588 let escalate_or_skip = tool_actions.escalate || tool_actions.skip_summarization;
2589 (idx, response_content, tool_actions, escalate_or_skip)
2590 }
2591 };
2592
2593 let mut results: Vec<(usize, Content, EventActions, bool)> = match strategy {
2595 ToolExecutionStrategy::Sequential => {
2596 let mut results = Vec::with_capacity(fc_parts.len());
2597 for (idx, name, args, id, fcid) in fc_parts {
2598 results.push(execute_one_tool(idx, name, args, id, fcid).await);
2599 }
2600 results
2601 }
2602 ToolExecutionStrategy::Parallel => {
2603 use futures::StreamExt as _;
2604 let buffer_size = fc_parts.len().max(1);
2609 futures::stream::iter(fc_parts.into_iter().map(
2610 |(idx, name, args, id, fcid)| {
2611 execute_one_tool(idx, name, args, id, fcid)
2612 },
2613 ))
2614 .buffer_unordered(buffer_size)
2615 .collect()
2616 .await
2617 }
2618 ToolExecutionStrategy::Auto => {
2619 let mut read_only_fcs = Vec::new();
2621 let mut mutable_fcs = Vec::new();
2622 for fc in fc_parts {
2623 let is_ro = tool_map.get(&fc.1)
2624 .map(|t| t.is_read_only())
2625 .unwrap_or(false);
2626 if is_ro { read_only_fcs.push(fc); } else { mutable_fcs.push(fc); }
2627 }
2628 let mut all_results = Vec::new();
2629 if !read_only_fcs.is_empty() {
2633 use futures::StreamExt as _;
2634 let buffer_size = read_only_fcs.len().max(1);
2635 all_results.extend(
2636 futures::stream::iter(read_only_fcs.into_iter().map(
2637 |(idx, name, args, id, fcid)| {
2638 execute_one_tool(idx, name, args, id, fcid)
2639 },
2640 ))
2641 .buffer_unordered(buffer_size)
2642 .collect::<Vec<_>>()
2643 .await,
2644 );
2645 }
2646 for (idx, name, args, id, fcid) in mutable_fcs {
2648 all_results.push(execute_one_tool(idx, name, args, id, fcid).await);
2649 }
2650 all_results
2651 }
2652 };
2653 results.sort_by_key(|r| r.0);
2655
2656 circuit_breaker_state = cb_mutex.into_inner().unwrap_or_else(|e| e.into_inner());
2658
2659 for (_, response_content, tool_actions, escalate_or_skip) in results {
2661 let mut tool_event = Event::new(&invocation_id);
2662 tool_event.author = agent_name.clone();
2663 tool_event.actions = tool_actions;
2664 tool_event.llm_response.content = Some(response_content.clone());
2665 yield Ok(tool_event);
2666
2667 if escalate_or_skip {
2668 return;
2669 }
2670
2671 conversation_history.push(response_content);
2672 }
2673 }
2674
2675 if all_calls_are_long_running {
2679 }
2683 }
2684
2685 for callback in after_agent_callbacks.as_ref() {
2688 match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
2689 Ok(Some(content)) => {
2690 let mut after_event = Event::new(&invocation_id);
2692 after_event.author = agent_name.clone();
2693 after_event.llm_response.content = Some(content);
2694 yield Ok(after_event);
2695 break; }
2697 Ok(None) => {
2698 continue;
2700 }
2701 Err(e) => {
2702 yield Err(e);
2704 return;
2705 }
2706 }
2707 }
2708 };
2709
2710 Ok(Box::pin(s))
2711 }
2712}