Skip to main content

adk_agent/
llm_agent.rs

1use adk_core::{
2    AfterAgentCallback, AfterModelCallback, AfterToolCallback, AfterToolCallbackFull, Agent,
3    BeforeAgentCallback, BeforeModelCallback, BeforeModelResult, BeforeToolCallback,
4    CallbackContext, Content, Event, EventActions, FunctionResponseData, GlobalInstructionProvider,
5    InstructionProvider, InvocationContext, Llm, LlmRequest, LlmResponse, MemoryEntry,
6    OnToolErrorCallback, Part, ReadonlyContext, Result, RetryBudget, Tool, ToolCallbackContext,
7    ToolConfirmationDecision, ToolConfirmationPolicy, ToolConfirmationRequest, ToolContext,
8    ToolExecutionStrategy, ToolOutcome, Toolset,
9};
10use async_stream::stream;
11use async_trait::async_trait;
12use std::sync::{Arc, Mutex};
13use tracing::Instrument;
14
15#[cfg(feature = "enhanced-plugins")]
16use adk_plugin::{
17    BeforeModelCallResult, BeforeToolCallResult, EnhancedPlugin, EnhancedPluginManager,
18};
19
20#[cfg(feature = "skills")]
21use crate::skill_shim::load_skill_index;
22use crate::{
23    guardrails::{GuardrailSet, enforce_guardrails},
24    skill_shim::{SelectionPolicy, SkillIndex, select_skill_prompt_block},
25    tool_call_markup::normalize_option_content,
26    workflow::with_user_content_override,
27};
28
29/// Default maximum number of LLM round-trips (iterations) before the agent stops.
30pub const DEFAULT_MAX_ITERATIONS: u32 = 100;
31
32/// Default tool execution timeout (5 minutes).
33pub const DEFAULT_TOOL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300);
34
35fn trace_json_payload<T: serde::Serialize>(
36    value: &T,
37    record_payloads: bool,
38    max_bytes: usize,
39) -> String {
40    let json = serde_json::to_string(value).unwrap_or_default();
41    if cfg!(feature = "record-payloads") && record_payloads {
42        return json;
43    }
44
45    let max_bytes = max_bytes.max(32);
46    if json.len() <= max_bytes {
47        return json;
48    }
49
50    let mut end = max_bytes;
51    while !json.is_char_boundary(end) {
52        end -= 1;
53    }
54    format!("{}...[truncated {} bytes]", &json[..end], json.len() - end)
55}
56
57/// An LLM-powered agent that orchestrates tool calls and sub-agent delegation.
58///
59/// `LlmAgent` is the primary agent type in ADK. It sends requests to an LLM,
60/// executes tool calls from the response, and iterates until the model produces
61/// a final text response or the iteration limit is reached.
62///
63/// Use [`LlmAgentBuilder`] (via `LlmAgent::builder()`) to construct instances.
64pub struct LlmAgent {
65    name: String,
66    description: String,
67    model: Arc<dyn Llm>,
68    instruction: Option<String>,
69    instruction_provider: Option<Arc<InstructionProvider>>,
70    global_instruction: Option<String>,
71    global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
72    skills_index: Option<Arc<SkillIndex>>,
73    skill_policy: SelectionPolicy,
74    max_skill_chars: usize,
75    #[allow(dead_code)] // Part of public API via builder
76    input_schema: Option<serde_json::Value>,
77    output_schema: Option<serde_json::Value>,
78    disallow_transfer_to_parent: bool,
79    disallow_transfer_to_peers: bool,
80    include_contents: adk_core::IncludeContents,
81    tools: Vec<Arc<dyn Tool>>,
82    #[allow(dead_code)] // Used in runtime toolset resolution (task 2.2)
83    toolsets: Vec<Arc<dyn Toolset>>,
84    sub_agents: Vec<Arc<dyn Agent>>,
85    output_key: Option<String>,
86    /// Default generation config (temperature, top_p, etc.) applied to every LLM request.
87    generate_content_config: Option<adk_core::GenerateContentConfig>,
88    /// Maximum number of LLM round-trips before stopping
89    max_iterations: u32,
90    /// Timeout for individual tool executions
91    tool_timeout: std::time::Duration,
92    before_callbacks: Arc<Vec<BeforeAgentCallback>>,
93    after_callbacks: Arc<Vec<AfterAgentCallback>>,
94    before_model_callbacks: Arc<Vec<BeforeModelCallback>>,
95    after_model_callbacks: Arc<Vec<AfterModelCallback>>,
96    before_tool_callbacks: Arc<Vec<BeforeToolCallback>>,
97    after_tool_callbacks: Arc<Vec<AfterToolCallback>>,
98    on_tool_error_callbacks: Arc<Vec<OnToolErrorCallback>>,
99    /// Rich after-tool callbacks that receive tool, args, and response.
100    after_tool_callbacks_full: Arc<Vec<AfterToolCallbackFull>>,
101    /// Default retry budget applied to all tools without a per-tool override.
102    default_retry_budget: Option<RetryBudget>,
103    /// Per-tool retry budget overrides, keyed by tool name.
104    tool_retry_budgets: std::collections::HashMap<String, RetryBudget>,
105    /// Circuit breaker failure threshold. When set, tools are temporarily disabled
106    /// after this many consecutive failures within a single invocation.
107    circuit_breaker_threshold: Option<u32>,
108    tool_confirmation_policy: ToolConfirmationPolicy,
109    /// Per-agent tool execution strategy override. When `Some`, overrides the
110    /// `RunConfig` strategy for this agent's dispatch loop.
111    tool_execution_strategy: Option<ToolExecutionStrategy>,
112    input_guardrails: Arc<GuardrailSet>,
113    output_guardrails: Arc<GuardrailSet>,
114    /// Enhanced plugin manager for fine-grained tool/model call interception.
115    /// Only created when enhanced plugins are registered (zero overhead otherwise).
116    #[cfg(feature = "enhanced-plugins")]
117    enhanced_plugin_manager: Option<Arc<EnhancedPluginManager>>,
118}
119
120impl std::fmt::Debug for LlmAgent {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.debug_struct("LlmAgent")
123            .field("name", &self.name)
124            .field("description", &self.description)
125            .field("model", &self.model.name())
126            .field("instruction", &self.instruction)
127            .field("tools_count", &self.tools.len())
128            .field("sub_agents_count", &self.sub_agents.len())
129            .finish()
130    }
131}
132
133impl LlmAgent {
134    async fn apply_input_guardrails(
135        ctx: Arc<dyn InvocationContext>,
136        input_guardrails: Arc<GuardrailSet>,
137    ) -> Result<Arc<dyn InvocationContext>> {
138        let content =
139            enforce_guardrails(input_guardrails.as_ref(), ctx.user_content(), "input").await?;
140        if content.role != ctx.user_content().role || content.parts != ctx.user_content().parts {
141            Ok(with_user_content_override(ctx, content))
142        } else {
143            Ok(ctx)
144        }
145    }
146
147    async fn apply_output_guardrails(
148        output_guardrails: &GuardrailSet,
149        content: Content,
150    ) -> Result<Content> {
151        enforce_guardrails(output_guardrails, &content, "output").await
152    }
153
154    fn history_parts_from_provider_metadata(
155        provider_metadata: Option<&serde_json::Value>,
156    ) -> Vec<Part> {
157        let Some(provider_metadata) = provider_metadata else {
158            return Vec::new();
159        };
160
161        let history_parts = provider_metadata
162            .get("conversation_history_parts")
163            .or_else(|| {
164                provider_metadata
165                    .get("openai")
166                    .and_then(|openai| openai.get("conversation_history_parts"))
167            })
168            .and_then(serde_json::Value::as_array);
169
170        history_parts
171            .into_iter()
172            .flatten()
173            .filter_map(|value| serde_json::from_value::<Part>(value.clone()).ok())
174            .collect()
175    }
176
177    fn augment_content_for_history(
178        content: &Content,
179        provider_metadata: Option<&serde_json::Value>,
180    ) -> Content {
181        let mut augmented = content.clone();
182        augmented.parts.extend(Self::history_parts_from_provider_metadata(provider_metadata));
183        augmented
184    }
185}
186
187/// Builder for constructing an [`LlmAgent`] with all configuration options.
188pub struct LlmAgentBuilder {
189    name: String,
190    description: Option<String>,
191    model: Option<Arc<dyn Llm>>,
192    instruction: Option<String>,
193    instruction_provider: Option<Arc<InstructionProvider>>,
194    global_instruction: Option<String>,
195    global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
196    skills_index: Option<Arc<SkillIndex>>,
197    skill_policy: SelectionPolicy,
198    max_skill_chars: usize,
199    input_schema: Option<serde_json::Value>,
200    output_schema: Option<serde_json::Value>,
201    disallow_transfer_to_parent: bool,
202    disallow_transfer_to_peers: bool,
203    include_contents: adk_core::IncludeContents,
204    tools: Vec<Arc<dyn Tool>>,
205    toolsets: Vec<Arc<dyn Toolset>>,
206    sub_agents: Vec<Arc<dyn Agent>>,
207    output_key: Option<String>,
208    generate_content_config: Option<adk_core::GenerateContentConfig>,
209    max_iterations: u32,
210    tool_timeout: std::time::Duration,
211    before_callbacks: Vec<BeforeAgentCallback>,
212    after_callbacks: Vec<AfterAgentCallback>,
213    before_model_callbacks: Vec<BeforeModelCallback>,
214    after_model_callbacks: Vec<AfterModelCallback>,
215    before_tool_callbacks: Vec<BeforeToolCallback>,
216    after_tool_callbacks: Vec<AfterToolCallback>,
217    on_tool_error_callbacks: Vec<OnToolErrorCallback>,
218    after_tool_callbacks_full: Vec<AfterToolCallbackFull>,
219    default_retry_budget: Option<RetryBudget>,
220    tool_retry_budgets: std::collections::HashMap<String, RetryBudget>,
221    circuit_breaker_threshold: Option<u32>,
222    tool_confirmation_policy: ToolConfirmationPolicy,
223    tool_execution_strategy: Option<ToolExecutionStrategy>,
224    input_guardrails: GuardrailSet,
225    output_guardrails: GuardrailSet,
226    /// Enhanced plugins to register on the built agent.
227    #[cfg(feature = "enhanced-plugins")]
228    enhanced_plugins: Vec<Arc<dyn EnhancedPlugin>>,
229}
230
231impl LlmAgentBuilder {
232    /// Create a new builder with the given agent name.
233    pub fn new(name: impl Into<String>) -> Self {
234        Self {
235            name: name.into(),
236            description: None,
237            model: None,
238            instruction: None,
239            instruction_provider: None,
240            global_instruction: None,
241            global_instruction_provider: None,
242            skills_index: None,
243            skill_policy: SelectionPolicy::default(),
244            max_skill_chars: 2000,
245            input_schema: None,
246            output_schema: None,
247            disallow_transfer_to_parent: false,
248            disallow_transfer_to_peers: false,
249            include_contents: adk_core::IncludeContents::Default,
250            tools: Vec::new(),
251            toolsets: Vec::new(),
252            sub_agents: Vec::new(),
253            output_key: None,
254            generate_content_config: None,
255            max_iterations: DEFAULT_MAX_ITERATIONS,
256            tool_timeout: DEFAULT_TOOL_TIMEOUT,
257            before_callbacks: Vec::new(),
258            after_callbacks: Vec::new(),
259            before_model_callbacks: Vec::new(),
260            after_model_callbacks: Vec::new(),
261            before_tool_callbacks: Vec::new(),
262            after_tool_callbacks: Vec::new(),
263            on_tool_error_callbacks: Vec::new(),
264            after_tool_callbacks_full: Vec::new(),
265            default_retry_budget: None,
266            tool_retry_budgets: std::collections::HashMap::new(),
267            circuit_breaker_threshold: None,
268            tool_confirmation_policy: ToolConfirmationPolicy::Never,
269            tool_execution_strategy: None,
270            input_guardrails: GuardrailSet::new(),
271            output_guardrails: GuardrailSet::new(),
272            #[cfg(feature = "enhanced-plugins")]
273            enhanced_plugins: Vec::new(),
274        }
275    }
276
277    /// Set the agent description.
278    pub fn description(mut self, desc: impl Into<String>) -> Self {
279        self.description = Some(desc.into());
280        self
281    }
282
283    /// Set the LLM model for this agent.
284    pub fn model(mut self, model: Arc<dyn Llm>) -> Self {
285        self.model = Some(model);
286        self
287    }
288
289    /// Set the system instruction for this agent.
290    pub fn instruction(mut self, instruction: impl Into<String>) -> Self {
291        self.instruction = Some(instruction.into());
292        self
293    }
294
295    /// Set a dynamic instruction provider evaluated per invocation.
296    pub fn instruction_provider(mut self, provider: InstructionProvider) -> Self {
297        self.instruction_provider = Some(Arc::new(provider));
298        self
299    }
300
301    /// Set a global instruction prepended to all requests.
302    pub fn global_instruction(mut self, instruction: impl Into<String>) -> Self {
303        self.global_instruction = Some(instruction.into());
304        self
305    }
306
307    /// Set a dynamic global instruction provider evaluated per invocation.
308    pub fn global_instruction_provider(mut self, provider: GlobalInstructionProvider) -> Self {
309        self.global_instruction_provider = Some(Arc::new(provider));
310        self
311    }
312
313    /// Set a preloaded skills index for this agent.
314    #[cfg(feature = "skills")]
315    pub fn with_skills(mut self, index: SkillIndex) -> Self {
316        self.skills_index = Some(Arc::new(index));
317        self
318    }
319
320    /// Auto-load skills from `.skills/` in the current working directory.
321    #[cfg(feature = "skills")]
322    pub fn with_auto_skills(self) -> Result<Self> {
323        self.with_skills_from_root(".")
324    }
325
326    /// Auto-load skills from `.skills/` under a custom root directory.
327    #[cfg(feature = "skills")]
328    pub fn with_skills_from_root(mut self, root: impl AsRef<std::path::Path>) -> Result<Self> {
329        let index = load_skill_index(root).map_err(|e| adk_core::AdkError::agent(e.to_string()))?;
330        self.skills_index = Some(Arc::new(index));
331        Ok(self)
332    }
333
334    /// Customize skill selection behavior.
335    #[cfg(feature = "skills")]
336    pub fn with_skill_policy(mut self, policy: SelectionPolicy) -> Self {
337        self.skill_policy = policy;
338        self
339    }
340
341    /// Limit injected skill content length.
342    #[cfg(feature = "skills")]
343    pub fn with_skill_budget(mut self, max_chars: usize) -> Self {
344        self.max_skill_chars = max_chars;
345        self
346    }
347
348    /// Set a JSON schema for validating user input.
349    pub fn input_schema(mut self, schema: serde_json::Value) -> Self {
350        self.input_schema = Some(schema);
351        self
352    }
353
354    /// Set a JSON schema for structured output from the LLM.
355    pub fn output_schema(mut self, schema: serde_json::Value) -> Self {
356        self.output_schema = Some(schema);
357        self
358    }
359
360    /// Prevent this agent from transferring control back to its parent.
361    pub fn disallow_transfer_to_parent(mut self, disallow: bool) -> Self {
362        self.disallow_transfer_to_parent = disallow;
363        self
364    }
365
366    /// Prevent this agent from transferring control to peer agents.
367    pub fn disallow_transfer_to_peers(mut self, disallow: bool) -> Self {
368        self.disallow_transfer_to_peers = disallow;
369        self
370    }
371
372    /// Control which conversation history contents are included in LLM requests.
373    pub fn include_contents(mut self, include: adk_core::IncludeContents) -> Self {
374        self.include_contents = include;
375        self
376    }
377
378    /// Set a state key where the agent's final output will be stored.
379    pub fn output_key(mut self, key: impl Into<String>) -> Self {
380        self.output_key = Some(key.into());
381        self
382    }
383
384    /// Set default generation parameters (temperature, top_p, top_k, max_output_tokens)
385    /// applied to every LLM request made by this agent.
386    ///
387    /// These defaults are merged with any per-request config. If `output_schema` is also
388    /// set, the schema is preserved alongside these generation parameters.
389    ///
390    /// # Example
391    ///
392    /// ```rust,ignore
393    /// use adk_core::GenerateContentConfig;
394    ///
395    /// let agent = LlmAgentBuilder::new("my-agent")
396    ///     .model(model)
397    ///     .generate_content_config(GenerateContentConfig {
398    ///         temperature: Some(0.7),
399    ///         max_output_tokens: Some(2048),
400    ///         ..Default::default()
401    ///     })
402    ///     .build()?;
403    /// ```
404    pub fn generate_content_config(mut self, config: adk_core::GenerateContentConfig) -> Self {
405        self.generate_content_config = Some(config);
406        self
407    }
408
409    /// Set the default temperature for LLM requests.
410    /// Shorthand for setting just temperature without a full `GenerateContentConfig`.
411    pub fn temperature(mut self, temperature: f32) -> Self {
412        self.generate_content_config
413            .get_or_insert(adk_core::GenerateContentConfig::default())
414            .temperature = Some(temperature);
415        self
416    }
417
418    /// Set the default top_p for LLM requests.
419    pub fn top_p(mut self, top_p: f32) -> Self {
420        self.generate_content_config
421            .get_or_insert(adk_core::GenerateContentConfig::default())
422            .top_p = Some(top_p);
423        self
424    }
425
426    /// Set the default top_k for LLM requests.
427    pub fn top_k(mut self, top_k: i32) -> Self {
428        self.generate_content_config
429            .get_or_insert(adk_core::GenerateContentConfig::default())
430            .top_k = Some(top_k);
431        self
432    }
433
434    /// Set the default max output tokens for LLM requests.
435    pub fn max_output_tokens(mut self, max_tokens: i32) -> Self {
436        self.generate_content_config
437            .get_or_insert(adk_core::GenerateContentConfig::default())
438            .max_output_tokens = Some(max_tokens);
439        self
440    }
441
442    /// Set the maximum number of LLM round-trips (iterations) before the agent stops.
443    /// Default is 100.
444    pub fn max_iterations(mut self, max: u32) -> Self {
445        self.max_iterations = max;
446        self
447    }
448
449    /// Set the timeout for individual tool executions.
450    /// Default is 5 minutes. Tools that exceed this timeout will return an error.
451    pub fn tool_timeout(mut self, timeout: std::time::Duration) -> Self {
452        self.tool_timeout = timeout;
453        self
454    }
455
456    /// Add a tool to this agent's toolbox.
457    pub fn tool(mut self, tool: Arc<dyn Tool>) -> Self {
458        self.tools.push(tool);
459        self
460    }
461
462    /// Register a dynamic toolset for per-invocation tool resolution.
463    ///
464    /// Toolsets are resolved at the start of each `run()` call using the
465    /// invocation's `ReadonlyContext`. This enables context-dependent tools
466    /// like per-user browser sessions from a pool.
467    pub fn toolset(mut self, toolset: Arc<dyn Toolset>) -> Self {
468        self.toolsets.push(toolset);
469        self
470    }
471
472    /// Add a sub-agent that this agent can delegate to.
473    pub fn sub_agent(mut self, agent: Arc<dyn Agent>) -> Self {
474        self.sub_agents.push(agent);
475        self
476    }
477
478    /// Add a before-agent callback.
479    pub fn before_callback(mut self, callback: BeforeAgentCallback) -> Self {
480        self.before_callbacks.push(callback);
481        self
482    }
483
484    /// Add an after-agent callback.
485    pub fn after_callback(mut self, callback: AfterAgentCallback) -> Self {
486        self.after_callbacks.push(callback);
487        self
488    }
489
490    /// Add a before-model callback invoked before each LLM request.
491    pub fn before_model_callback(mut self, callback: BeforeModelCallback) -> Self {
492        self.before_model_callbacks.push(callback);
493        self
494    }
495
496    /// Add an after-model callback invoked after each LLM response.
497    pub fn after_model_callback(mut self, callback: AfterModelCallback) -> Self {
498        self.after_model_callbacks.push(callback);
499        self
500    }
501
502    /// Add a before-tool callback invoked before each tool execution.
503    pub fn before_tool_callback(mut self, callback: BeforeToolCallback) -> Self {
504        self.before_tool_callbacks.push(callback);
505        self
506    }
507
508    /// Add an after-tool callback invoked after each tool execution.
509    pub fn after_tool_callback(mut self, callback: AfterToolCallback) -> Self {
510        self.after_tool_callbacks.push(callback);
511        self
512    }
513
514    /// Register a rich after-tool callback that receives the tool, arguments,
515    /// and response value.
516    ///
517    /// This is the V2 callback surface aligned with the Python/Go ADK model
518    /// where `after_tool_callback` receives the full tool execution context.
519    /// Unlike [`after_tool_callback`](Self::after_tool_callback) (which only
520    /// receives `CallbackContext`), this callback can inspect and modify tool
521    /// results directly.
522    ///
523    /// Return `Ok(None)` to keep the original response, or `Ok(Some(value))`
524    /// to replace the function response sent to the LLM.
525    ///
526    /// These callbacks run after the legacy `after_tool_callback` chain.
527    /// `ToolOutcome` is available via `ctx.tool_outcome()`.
528    pub fn after_tool_callback_full(mut self, callback: AfterToolCallbackFull) -> Self {
529        self.after_tool_callbacks_full.push(callback);
530        self
531    }
532
533    /// Register a callback invoked when a tool execution fails
534    /// (after retries are exhausted).
535    ///
536    /// If the callback returns `Ok(Some(value))`, the value is used as a
537    /// fallback function response to the LLM. If it returns `Ok(None)`,
538    /// the next callback in the chain is tried. If no callback provides a
539    /// fallback, the original error is reported to the LLM.
540    pub fn on_tool_error(mut self, callback: OnToolErrorCallback) -> Self {
541        self.on_tool_error_callbacks.push(callback);
542        self
543    }
544
545    /// Set a default retry budget applied to all tools that do not have
546    /// a per-tool override.
547    ///
548    /// When a tool execution fails and a retry budget applies, the agent
549    /// retries up to `budget.max_retries` times with the configured delay
550    /// between attempts.
551    pub fn default_retry_budget(mut self, budget: RetryBudget) -> Self {
552        self.default_retry_budget = Some(budget);
553        self
554    }
555
556    /// Set a per-tool retry budget that overrides the default for the
557    /// named tool.
558    ///
559    /// Per-tool budgets take precedence over the default retry budget.
560    pub fn tool_retry_budget(mut self, tool_name: impl Into<String>, budget: RetryBudget) -> Self {
561        self.tool_retry_budgets.insert(tool_name.into(), budget);
562        self
563    }
564
565    /// Configure a circuit breaker that temporarily disables tools after
566    /// `threshold` consecutive failures within a single invocation.
567    ///
568    /// When a tool's consecutive failure count reaches the threshold, subsequent
569    /// calls to that tool are short-circuited with an immediate error response
570    /// until the next invocation (which resets the state).
571    pub fn circuit_breaker_threshold(mut self, threshold: u32) -> Self {
572        self.circuit_breaker_threshold = Some(threshold);
573        self
574    }
575
576    /// Configure tool confirmation requirements for this agent.
577    pub fn tool_confirmation_policy(mut self, policy: ToolConfirmationPolicy) -> Self {
578        self.tool_confirmation_policy = policy;
579        self
580    }
581
582    /// Require confirmation for a specific tool name.
583    pub fn require_tool_confirmation(mut self, tool_name: impl Into<String>) -> Self {
584        self.tool_confirmation_policy = self.tool_confirmation_policy.with_tool(tool_name);
585        self
586    }
587
588    /// Require confirmation for all tool calls.
589    pub fn require_tool_confirmation_for_all(mut self) -> Self {
590        self.tool_confirmation_policy = ToolConfirmationPolicy::Always;
591        self
592    }
593
594    /// Set the tool execution strategy for this agent.
595    ///
596    /// When set, this overrides the `RunConfig`'s `tool_execution_strategy`
597    /// for this agent's dispatch loop. When `None` (the default), the
598    /// `RunConfig` value is used.
599    pub fn tool_execution_strategy(mut self, strategy: ToolExecutionStrategy) -> Self {
600        self.tool_execution_strategy = Some(strategy);
601        self
602    }
603
604    /// Set input guardrails to validate user input before processing.
605    ///
606    /// Input guardrails run before the agent processes the request and can:
607    /// - Block harmful or off-topic content
608    /// - Redact PII from user input
609    /// - Enforce input length limits
610    ///
611    /// Requires the `guardrails` feature.
612    pub fn input_guardrails(mut self, guardrails: GuardrailSet) -> Self {
613        self.input_guardrails = guardrails;
614        self
615    }
616
617    /// Set output guardrails to validate agent responses.
618    ///
619    /// Output guardrails run after the agent generates a response and can:
620    /// - Enforce JSON schema compliance
621    /// - Redact PII from responses
622    /// - Block harmful content in responses
623    ///
624    /// Requires the `guardrails` feature.
625    pub fn output_guardrails(mut self, guardrails: GuardrailSet) -> Self {
626        self.output_guardrails = guardrails;
627        self
628    }
629
630    /// Register a single enhanced plugin for fine-grained tool/model call interception.
631    ///
632    /// Enhanced plugins can inspect and modify tool arguments, tool results,
633    /// model requests, and model responses. They execute in priority order
634    /// (lower priority values execute first).
635    ///
636    /// Requires the `enhanced-plugins` feature.
637    ///
638    /// # Example
639    ///
640    /// ```rust,ignore
641    /// use std::sync::Arc;
642    /// use adk_plugin::EnhancedPlugin;
643    ///
644    /// let agent = LlmAgentBuilder::new("my-agent")
645    ///     .model(model)
646    ///     .enhanced_plugin(Arc::new(MyPlugin::new()))
647    ///     .build()?;
648    /// ```
649    #[cfg(feature = "enhanced-plugins")]
650    pub fn enhanced_plugin(mut self, plugin: Arc<dyn EnhancedPlugin>) -> Self {
651        self.enhanced_plugins.push(plugin);
652        self
653    }
654
655    /// Register multiple enhanced plugins at once.
656    ///
657    /// Plugins are sorted by priority when the agent is built. Lower priority
658    /// values execute first. Same-priority plugins execute in registration order.
659    ///
660    /// Requires the `enhanced-plugins` feature.
661    ///
662    /// # Example
663    ///
664    /// ```rust,ignore
665    /// use std::sync::Arc;
666    /// use adk_plugin::EnhancedPlugin;
667    ///
668    /// let agent = LlmAgentBuilder::new("my-agent")
669    ///     .model(model)
670    ///     .enhanced_plugins(vec![
671    ///         Arc::new(SecurityPlugin::new()),  // priority = 10
672    ///         Arc::new(LoggingPlugin::new()),   // priority = 100
673    ///     ])
674    ///     .build()?;
675    /// ```
676    #[cfg(feature = "enhanced-plugins")]
677    pub fn enhanced_plugins(mut self, plugins: Vec<Arc<dyn EnhancedPlugin>>) -> Self {
678        self.enhanced_plugins.extend(plugins);
679        self
680    }
681
682    /// Build the [`LlmAgent`], returning an error if no model was set.
683    pub fn build(self) -> Result<LlmAgent> {
684        let model = self.model.ok_or_else(|| adk_core::AdkError::agent("Model is required"))?;
685
686        let mut seen_names = std::collections::HashSet::new();
687        for agent in &self.sub_agents {
688            if !seen_names.insert(agent.name()) {
689                return Err(adk_core::AdkError::agent(format!(
690                    "Duplicate sub-agent name: {}",
691                    agent.name()
692                )));
693            }
694        }
695
696        // Construct EnhancedPluginManager only when plugins are registered (zero overhead otherwise)
697        #[cfg(feature = "enhanced-plugins")]
698        let enhanced_plugin_manager = if self.enhanced_plugins.is_empty() {
699            None
700        } else {
701            Some(Arc::new(EnhancedPluginManager::new(self.enhanced_plugins)))
702        };
703
704        Ok(LlmAgent {
705            name: self.name,
706            description: self.description.unwrap_or_default(),
707            model,
708            instruction: self.instruction,
709            instruction_provider: self.instruction_provider,
710            global_instruction: self.global_instruction,
711            global_instruction_provider: self.global_instruction_provider,
712            skills_index: self.skills_index,
713            skill_policy: self.skill_policy,
714            max_skill_chars: self.max_skill_chars,
715            input_schema: self.input_schema,
716            output_schema: self.output_schema,
717            disallow_transfer_to_parent: self.disallow_transfer_to_parent,
718            disallow_transfer_to_peers: self.disallow_transfer_to_peers,
719            include_contents: self.include_contents,
720            tools: self.tools,
721            toolsets: self.toolsets,
722            sub_agents: self.sub_agents,
723            output_key: self.output_key,
724            generate_content_config: self.generate_content_config,
725            max_iterations: self.max_iterations,
726            tool_timeout: self.tool_timeout,
727            before_callbacks: Arc::new(self.before_callbacks),
728            after_callbacks: Arc::new(self.after_callbacks),
729            before_model_callbacks: Arc::new(self.before_model_callbacks),
730            after_model_callbacks: Arc::new(self.after_model_callbacks),
731            before_tool_callbacks: Arc::new(self.before_tool_callbacks),
732            after_tool_callbacks: Arc::new(self.after_tool_callbacks),
733            on_tool_error_callbacks: Arc::new(self.on_tool_error_callbacks),
734            after_tool_callbacks_full: Arc::new(self.after_tool_callbacks_full),
735            default_retry_budget: self.default_retry_budget,
736            tool_retry_budgets: self.tool_retry_budgets,
737            circuit_breaker_threshold: self.circuit_breaker_threshold,
738            tool_confirmation_policy: self.tool_confirmation_policy,
739            tool_execution_strategy: self.tool_execution_strategy,
740            input_guardrails: Arc::new(self.input_guardrails),
741            output_guardrails: Arc::new(self.output_guardrails),
742            #[cfg(feature = "enhanced-plugins")]
743            enhanced_plugin_manager,
744        })
745    }
746}
747
748// AgentToolContext wraps the parent InvocationContext and preserves all context
749// instead of throwing it away like SimpleToolContext did
750struct AgentToolContext {
751    parent_ctx: Arc<dyn InvocationContext>,
752    function_call_id: String,
753    actions: Mutex<EventActions>,
754}
755
756impl AgentToolContext {
757    fn new(parent_ctx: Arc<dyn InvocationContext>, function_call_id: String) -> Self {
758        Self { parent_ctx, function_call_id, actions: Mutex::new(EventActions::default()) }
759    }
760
761    fn actions_guard(&self) -> std::sync::MutexGuard<'_, EventActions> {
762        self.actions.lock().unwrap_or_else(|e| e.into_inner())
763    }
764}
765
766#[async_trait]
767impl ReadonlyContext for AgentToolContext {
768    fn invocation_id(&self) -> &str {
769        self.parent_ctx.invocation_id()
770    }
771
772    fn agent_name(&self) -> &str {
773        self.parent_ctx.agent_name()
774    }
775
776    fn user_id(&self) -> &str {
777        // ✅ Delegate to parent - now tools get the real user_id!
778        self.parent_ctx.user_id()
779    }
780
781    fn app_name(&self) -> &str {
782        // ✅ Delegate to parent - now tools get the real app_name!
783        self.parent_ctx.app_name()
784    }
785
786    fn session_id(&self) -> &str {
787        // ✅ Delegate to parent - now tools get the real session_id!
788        self.parent_ctx.session_id()
789    }
790
791    fn branch(&self) -> &str {
792        self.parent_ctx.branch()
793    }
794
795    fn user_content(&self) -> &Content {
796        self.parent_ctx.user_content()
797    }
798}
799
800#[async_trait]
801impl CallbackContext for AgentToolContext {
802    fn artifacts(&self) -> Option<Arc<dyn adk_core::Artifacts>> {
803        // ✅ Delegate to parent - tools can now access artifacts!
804        self.parent_ctx.artifacts()
805    }
806
807    fn shared_state(&self) -> Option<Arc<adk_core::SharedState>> {
808        self.parent_ctx.shared_state()
809    }
810}
811
812#[async_trait]
813impl ToolContext for AgentToolContext {
814    fn function_call_id(&self) -> &str {
815        &self.function_call_id
816    }
817
818    fn actions(&self) -> EventActions {
819        self.actions_guard().clone()
820    }
821
822    fn set_actions(&self, actions: EventActions) {
823        *self.actions_guard() = actions;
824    }
825
826    async fn search_memory(&self, query: &str) -> Result<Vec<MemoryEntry>> {
827        // ✅ Delegate to parent's memory if available
828        if let Some(memory) = self.parent_ctx.memory() {
829            memory.search(query).await
830        } else {
831            Ok(vec![])
832        }
833    }
834
835    fn user_scopes(&self) -> Vec<String> {
836        self.parent_ctx.user_scopes()
837    }
838
839    async fn get_secret(&self, name: &str) -> Result<Option<String>> {
840        self.parent_ctx.get_secret(name).await
841    }
842}
843
844/// Wrapper that adds ToolOutcome to an existing CallbackContext.
845/// Used only during after-tool callback invocation so callbacks
846/// can inspect structured metadata about the completed tool execution.
847struct ToolOutcomeCallbackContext {
848    inner: Arc<dyn CallbackContext>,
849    outcome: ToolOutcome,
850}
851
852#[async_trait]
853impl ReadonlyContext for ToolOutcomeCallbackContext {
854    fn invocation_id(&self) -> &str {
855        self.inner.invocation_id()
856    }
857
858    fn agent_name(&self) -> &str {
859        self.inner.agent_name()
860    }
861
862    fn user_id(&self) -> &str {
863        self.inner.user_id()
864    }
865
866    fn app_name(&self) -> &str {
867        self.inner.app_name()
868    }
869
870    fn session_id(&self) -> &str {
871        self.inner.session_id()
872    }
873
874    fn branch(&self) -> &str {
875        self.inner.branch()
876    }
877
878    fn user_content(&self) -> &Content {
879        self.inner.user_content()
880    }
881}
882
883#[async_trait]
884impl CallbackContext for ToolOutcomeCallbackContext {
885    fn artifacts(&self) -> Option<Arc<dyn adk_core::Artifacts>> {
886        self.inner.artifacts()
887    }
888
889    fn tool_outcome(&self) -> Option<ToolOutcome> {
890        Some(self.outcome.clone())
891    }
892}
893
894/// Per-invocation circuit breaker state.
895///
896/// Tracks consecutive failures per tool name within a single agent
897/// invocation. When a tool's consecutive failure count reaches the
898/// configured threshold the breaker "opens" and subsequent calls to
899/// that tool are short-circuited with an immediate error response.
900///
901/// The state is created fresh at the start of each `run()` call so
902/// it automatically resets between invocations.
903struct CircuitBreakerState {
904    threshold: u32,
905    /// tool_name → consecutive failure count
906    failures: std::collections::HashMap<String, u32>,
907}
908
909impl CircuitBreakerState {
910    fn new(threshold: u32) -> Self {
911        Self { threshold, failures: std::collections::HashMap::new() }
912    }
913
914    /// Returns `true` if the tool is currently tripped (open state).
915    fn is_open(&self, tool_name: &str) -> bool {
916        self.failures.get(tool_name).copied().unwrap_or(0) >= self.threshold
917    }
918
919    /// Record a tool outcome. Resets count on success, increments on failure.
920    fn record(&mut self, outcome: &ToolOutcome) {
921        if outcome.success {
922            self.failures.remove(&outcome.tool_name);
923        } else {
924            let count = self.failures.entry(outcome.tool_name.clone()).or_insert(0);
925            *count += 1;
926        }
927    }
928}
929
930#[async_trait]
931impl Agent for LlmAgent {
932    fn name(&self) -> &str {
933        &self.name
934    }
935
936    fn description(&self) -> &str {
937        &self.description
938    }
939
940    fn sub_agents(&self) -> &[Arc<dyn Agent>] {
941        &self.sub_agents
942    }
943
944    #[adk_telemetry::instrument(
945        skip(self, ctx),
946        fields(
947            agent.name = %self.name,
948            agent.description = %self.description,
949            invocation.id = %ctx.invocation_id(),
950            user.id = %ctx.user_id(),
951            session.id = %ctx.session_id()
952        )
953    )]
954    async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<adk_core::EventStream> {
955        adk_telemetry::info!("Starting agent execution");
956        let ctx = Self::apply_input_guardrails(ctx, self.input_guardrails.clone()).await?;
957
958        let agent_name = self.name.clone();
959        let invocation_id = ctx.invocation_id().to_string();
960        let model = self.model.clone();
961        let tools = self.tools.clone();
962        let toolsets = self.toolsets.clone();
963        let sub_agents = self.sub_agents.clone();
964
965        let instruction = self.instruction.clone();
966        let instruction_provider = self.instruction_provider.clone();
967        let global_instruction = self.global_instruction.clone();
968        let global_instruction_provider = self.global_instruction_provider.clone();
969        let skills_index = self.skills_index.clone();
970        let skill_policy = self.skill_policy.clone();
971        let max_skill_chars = self.max_skill_chars;
972        let output_key = self.output_key.clone();
973        let output_schema = self.output_schema.clone();
974        let generate_content_config = self.generate_content_config.clone();
975        let include_contents = self.include_contents;
976        let max_iterations = self.max_iterations;
977        let tool_timeout = self.tool_timeout;
978        // Clone Arc references (cheap)
979        let before_agent_callbacks = self.before_callbacks.clone();
980        let after_agent_callbacks = self.after_callbacks.clone();
981        let before_model_callbacks = self.before_model_callbacks.clone();
982        let after_model_callbacks = self.after_model_callbacks.clone();
983        let before_tool_callbacks = self.before_tool_callbacks.clone();
984        let after_tool_callbacks = self.after_tool_callbacks.clone();
985        let on_tool_error_callbacks = self.on_tool_error_callbacks.clone();
986        let after_tool_callbacks_full = self.after_tool_callbacks_full.clone();
987        let default_retry_budget = self.default_retry_budget.clone();
988        let tool_retry_budgets = self.tool_retry_budgets.clone();
989        let circuit_breaker_threshold = self.circuit_breaker_threshold;
990        let tool_confirmation_policy = self.tool_confirmation_policy.clone();
991        let disallow_transfer_to_parent = self.disallow_transfer_to_parent;
992        let disallow_transfer_to_peers = self.disallow_transfer_to_peers;
993        let output_guardrails = self.output_guardrails.clone();
994        let agent_tool_execution_strategy = self.tool_execution_strategy;
995        #[cfg(feature = "enhanced-plugins")]
996        let enhanced_plugin_manager = self.enhanced_plugin_manager.clone();
997
998        let s = stream! {
999            // ===== BEFORE AGENT CALLBACKS =====
1000            // Execute before the agent starts running
1001            // If any returns content, skip agent execution
1002            for callback in before_agent_callbacks.as_ref() {
1003                match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
1004                    Ok(Some(content)) => {
1005                        // Callback returned content - yield it and skip agent execution
1006                        let mut early_event = Event::new(&invocation_id);
1007                        early_event.author = agent_name.clone();
1008                        early_event.llm_response.content = Some(content);
1009                        yield Ok(early_event);
1010
1011                        // Skip rest of agent execution and go to after callbacks
1012                        for after_callback in after_agent_callbacks.as_ref() {
1013                            match after_callback(ctx.clone() as Arc<dyn CallbackContext>).await {
1014                                Ok(Some(after_content)) => {
1015                                    let mut after_event = Event::new(&invocation_id);
1016                                    after_event.author = agent_name.clone();
1017                                    after_event.llm_response.content = Some(after_content);
1018                                    yield Ok(after_event);
1019                                    return;
1020                                }
1021                                Ok(None) => continue,
1022                                Err(e) => {
1023                                    yield Err(e);
1024                                    return;
1025                                }
1026                            }
1027                        }
1028                        return;
1029                    }
1030                    Ok(None) => {
1031                        // Continue to next callback
1032                        continue;
1033                    }
1034                    Err(e) => {
1035                        // Callback failed - propagate error
1036                        yield Err(e);
1037                        return;
1038                    }
1039                }
1040            }
1041
1042            // ===== MAIN AGENT EXECUTION =====
1043            let mut prompt_preamble = Vec::new();
1044
1045            // ===== PROCESS SKILL CONTEXT =====
1046            // If skills are configured, select the most relevant skill from user input
1047            // and inject it as a compact instruction block before other prompts.
1048            if let Some(index) = &skills_index {
1049                let user_query = ctx
1050                    .user_content()
1051                    .parts
1052                    .iter()
1053                    .filter_map(|part| match part {
1054                        Part::Text { text } => Some(text.as_str()),
1055                        _ => None,
1056                    })
1057                    .collect::<Vec<_>>()
1058                    .join("\n");
1059
1060                if let Some((_matched, skill_block)) = select_skill_prompt_block(
1061                    index.as_ref(),
1062                    &user_query,
1063                    &skill_policy,
1064                    max_skill_chars,
1065                ) {
1066                    prompt_preamble.push(Content {
1067                        role: "user".to_string(),
1068                        parts: vec![Part::Text { text: skill_block }],
1069                    });
1070                }
1071            }
1072
1073            // ===== PROCESS GLOBAL INSTRUCTION =====
1074            // GlobalInstruction provides tree-wide personality/identity
1075            if let Some(provider) = &global_instruction_provider {
1076                // Dynamic global instruction via provider
1077                let global_inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
1078                if !global_inst.is_empty() {
1079                    prompt_preamble.push(Content {
1080                        role: "user".to_string(),
1081                        parts: vec![Part::Text { text: global_inst }],
1082                    });
1083                }
1084            } else if let Some(ref template) = global_instruction {
1085                // Static global instruction with template injection
1086                let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
1087                if !processed.is_empty() {
1088                    prompt_preamble.push(Content {
1089                        role: "user".to_string(),
1090                        parts: vec![Part::Text { text: processed }],
1091                    });
1092                }
1093            }
1094
1095            // ===== PROCESS AGENT INSTRUCTION =====
1096            // Agent-specific instruction
1097            if let Some(provider) = &instruction_provider {
1098                // Dynamic instruction via provider
1099                let inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
1100                if !inst.is_empty() {
1101                    prompt_preamble.push(Content {
1102                        role: "user".to_string(),
1103                        parts: vec![Part::Text { text: inst }],
1104                    });
1105                }
1106            } else if let Some(ref template) = instruction {
1107                // Static instruction with template injection
1108                let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
1109                if !processed.is_empty() {
1110                    prompt_preamble.push(Content {
1111                        role: "user".to_string(),
1112                        parts: vec![Part::Text { text: processed }],
1113                    });
1114                }
1115            }
1116
1117            // ===== LOAD SESSION HISTORY =====
1118            // Load previous conversation turns from the session
1119            // NOTE: Session history already includes the current user message (added by Runner before agent runs)
1120            // When transfer_targets is set, this agent was invoked via transfer — filter out
1121            // other agents' events so the LLM doesn't see the parent's tool calls as its own.
1122            let session_history = if !ctx.run_config().transfer_targets.is_empty() {
1123                ctx.session().conversation_history_for_agent(&agent_name)
1124            } else {
1125                ctx.session().conversation_history()
1126            };
1127            let mut session_history = session_history;
1128            let current_user_content = ctx.user_content().clone();
1129            if let Some(index) = session_history.iter().rposition(|content| content.role == "user") {
1130                session_history[index] = current_user_content.clone();
1131            } else {
1132                session_history.push(current_user_content.clone());
1133            }
1134
1135            // ===== APPLY INCLUDE_CONTENTS FILTERING =====
1136            // Control what conversation history the agent sees
1137            let mut conversation_history = match include_contents {
1138                adk_core::IncludeContents::None => {
1139                    let mut filtered = prompt_preamble.clone();
1140                    filtered.push(current_user_content);
1141                    filtered
1142                }
1143                adk_core::IncludeContents::Default => {
1144                    let mut full_history = prompt_preamble;
1145                    full_history.extend(session_history);
1146                    full_history
1147                }
1148            };
1149
1150            // ===== RESOLVE TOOLSETS =====
1151            // Start with static tools, then merge in toolset-provided tools
1152            let mut resolved_tools: Vec<Arc<dyn Tool>> = tools.clone();
1153            let static_tool_names: std::collections::HashSet<String> =
1154                tools.iter().map(|t| t.name().to_string()).collect();
1155
1156            // Track which toolset provided each tool for deterministic error messages
1157            let mut toolset_source: std::collections::HashMap<String, String> =
1158                std::collections::HashMap::new();
1159
1160            for toolset in &toolsets {
1161                let toolset_tools = match toolset
1162                    .tools(ctx.clone() as Arc<dyn ReadonlyContext>)
1163                    .await
1164                {
1165                    Ok(t) => t,
1166                    Err(e) => {
1167                        yield Err(e);
1168                        return;
1169                    }
1170                };
1171                for tool in &toolset_tools {
1172                    let name = tool.name().to_string();
1173                    // Check static-vs-toolset conflict
1174                    if static_tool_names.contains(&name) {
1175                        yield Err(adk_core::AdkError::agent(format!(
1176                            "Duplicate tool name '{name}': conflict between static tool and toolset '{}'",
1177                            toolset.name()
1178                        )));
1179                        return;
1180                    }
1181                    // Check toolset-vs-toolset conflict
1182                    if let Some(other_toolset_name) = toolset_source.get(&name) {
1183                        yield Err(adk_core::AdkError::agent(format!(
1184                            "Duplicate tool name '{name}': conflict between toolset '{}' and toolset '{}'",
1185                            other_toolset_name,
1186                            toolset.name()
1187                        )));
1188                        return;
1189                    }
1190                    toolset_source.insert(name, toolset.name().to_string());
1191                    resolved_tools.push(tool.clone());
1192                }
1193            }
1194
1195            // Build tool lookup map for O(1) access from merged resolved_tools
1196            let tool_map: std::collections::HashMap<String, Arc<dyn Tool>> = resolved_tools
1197                .iter()
1198                .map(|t| (t.name().to_string(), t.clone()))
1199                .collect();
1200
1201            // Helper: extract long-running tool IDs from content
1202            let collect_long_running_ids = |content: &Content| -> Vec<String> {
1203                content.parts.iter()
1204                    .filter_map(|p| {
1205                        if let Part::FunctionCall { name, .. } = p {
1206                            if let Some(tool) = tool_map.get(name) {
1207                                if tool.is_long_running() {
1208                                    return Some(name.clone());
1209                                }
1210                            }
1211                        }
1212                        None
1213                    })
1214                    .collect()
1215            };
1216
1217            // Build tool declarations for Gemini
1218            // Uses Tool::declaration() so provider-native built-ins can attach
1219            // adapter-specific metadata while regular function tools retain the
1220            // standard name/description/schema shape.
1221            let mut tool_declarations = std::collections::HashMap::new();
1222            for tool in &resolved_tools {
1223                tool_declarations.insert(tool.name().to_string(), tool.declaration());
1224            }
1225
1226            // Build the list of valid transfer targets.
1227            // Sources: sub_agents (always) + transfer_targets from RunConfig
1228            // (set by the runner to include parent/peers for transferred agents).
1229            // Apply disallow_transfer_to_parent / disallow_transfer_to_peers filtering.
1230            let mut valid_transfer_targets: Vec<String> = sub_agents
1231                .iter()
1232                .map(|a| a.name().to_string())
1233                .collect();
1234
1235            // Merge in runner-provided targets (parent, peers) from RunConfig
1236            let run_config_targets = &ctx.run_config().transfer_targets;
1237            let parent_agent_name = ctx.run_config().parent_agent.clone();
1238            let sub_agent_names: std::collections::HashSet<&str> = sub_agents
1239                .iter()
1240                .map(|a| a.name())
1241                .collect();
1242
1243            for target in run_config_targets {
1244                // Skip if already in the list (from sub_agents)
1245                if sub_agent_names.contains(target.as_str()) {
1246                    continue;
1247                }
1248
1249                // Apply disallow flags
1250                let is_parent = parent_agent_name.as_deref() == Some(target.as_str());
1251                if is_parent && disallow_transfer_to_parent {
1252                    continue;
1253                }
1254                if !is_parent && disallow_transfer_to_peers {
1255                    continue;
1256                }
1257
1258                valid_transfer_targets.push(target.clone());
1259            }
1260
1261            // Inject transfer_to_agent tool if there are valid targets
1262            if !valid_transfer_targets.is_empty() {
1263                let transfer_tool_name = "transfer_to_agent";
1264                let transfer_tool_decl = serde_json::json!({
1265                    "name": transfer_tool_name,
1266                    "description": format!(
1267                        "Transfer execution to another agent. Valid targets: {}",
1268                        valid_transfer_targets.join(", ")
1269                    ),
1270                    "parameters": {
1271                        "type": "object",
1272                        "properties": {
1273                            "agent_name": {
1274                                "type": "string",
1275                                "description": "The name of the agent to transfer to.",
1276                                "enum": valid_transfer_targets
1277                            }
1278                        },
1279                        "required": ["agent_name"]
1280                    }
1281                });
1282                tool_declarations.insert(transfer_tool_name.to_string(), transfer_tool_decl);
1283            }
1284
1285
1286            // ===== CIRCUIT BREAKER STATE =====
1287            // Created fresh per invocation so it resets between runs.
1288            let mut circuit_breaker_state = circuit_breaker_threshold.map(CircuitBreakerState::new);
1289
1290            // Multi-turn loop with max iterations
1291            let mut iteration = 0;
1292
1293            loop {
1294                iteration += 1;
1295                if iteration > max_iterations {
1296                    yield Err(adk_core::AdkError::agent(
1297                        format!("Max iterations ({max_iterations}) exceeded")
1298                    ));
1299                    return;
1300                }
1301
1302                // Build request with conversation history
1303                // Merge agent-level generate_content_config with output_schema.
1304                // Agent-level config provides defaults (temperature, top_p, etc.),
1305                // output_schema is layered on top as response_schema.
1306                // If the runner set a cached_content name (via automatic cache lifecycle),
1307                // merge it into the config so the provider can reuse cached content.
1308                let config = match (&generate_content_config, &output_schema) {
1309                    (Some(base), Some(schema)) => {
1310                        let mut merged = base.clone();
1311                        merged.response_schema = Some(schema.clone());
1312                        Some(merged)
1313                    }
1314                    (Some(base), None) => Some(base.clone()),
1315                    (None, Some(schema)) => Some(adk_core::GenerateContentConfig {
1316                        response_schema: Some(schema.clone()),
1317                        ..Default::default()
1318                    }),
1319                    (None, None) => None,
1320                };
1321
1322                // Layer cached_content from RunConfig onto the request config.
1323                let config = if let Some(ref cached) = ctx.run_config().cached_content {
1324                    let mut cfg = config.unwrap_or_default();
1325                    // Only set if the agent hasn't already specified one
1326                    if cfg.cached_content.is_none() {
1327                        cfg.cached_content = Some(cached.clone());
1328                    }
1329                    Some(cfg)
1330                } else {
1331                    config
1332                };
1333
1334                let request = LlmRequest {
1335                    model: model.name().to_string(),
1336                    contents: conversation_history.clone(),
1337                    tools: tool_declarations.clone(),
1338                    config,
1339                };
1340
1341                // ===== ENHANCED PLUGIN: BEFORE MODEL CALL =====
1342                // Enhanced plugins can modify the request or short-circuit the model call.
1343                // They run before legacy before_model_callbacks.
1344                #[cfg(feature = "enhanced-plugins")]
1345                let (request, model_response_override_from_plugin) = {
1346                    if let Some(epm) = &enhanced_plugin_manager {
1347                        match epm.run_before_model_call(request, ctx.clone() as Arc<dyn CallbackContext>).await {
1348                            Ok(BeforeModelCallResult::Continue(modified_request)) => {
1349                                (modified_request, None)
1350                            }
1351                            Ok(BeforeModelCallResult::ShortCircuit(response)) => {
1352                                // Use a default request since we're short-circuiting
1353                                (LlmRequest::new("", vec![]), Some(response))
1354                            }
1355                            Err(e) => {
1356                                yield Err(e);
1357                                return;
1358                            }
1359                        }
1360                    } else {
1361                        (request, None)
1362                    }
1363                };
1364                #[cfg(not(feature = "enhanced-plugins"))]
1365                let model_response_override_from_plugin: Option<LlmResponse> = None;
1366
1367                // ===== BEFORE MODEL CALLBACKS =====
1368                // These can modify the request or skip the model call by returning a response
1369                let mut current_request = request;
1370                let mut model_response_override = model_response_override_from_plugin;
1371                if model_response_override.is_none() {
1372                    for callback in before_model_callbacks.as_ref() {
1373                        match callback(ctx.clone() as Arc<dyn CallbackContext>, current_request.clone()).await {
1374                            Ok(BeforeModelResult::Continue(modified_request)) => {
1375                                // Callback may have modified the request, continue with it
1376                                current_request = modified_request;
1377                            }
1378                            Ok(BeforeModelResult::Skip(response)) => {
1379                                // Callback returned a response - skip model call
1380                                model_response_override = Some(response);
1381                                break;
1382                            }
1383                            Err(e) => {
1384                                // Callback failed - propagate error
1385                                yield Err(e);
1386                                return;
1387                            }
1388                        }
1389                    }
1390                }
1391                let request = current_request;
1392
1393                // Determine streaming source: cached response or real model
1394                let mut accumulated_content: Option<Content> = None;
1395                let mut final_provider_metadata: Option<serde_json::Value> = None;
1396
1397                if let Some(cached_response) = model_response_override {
1398                    // Use callback-provided response (e.g., from cache)
1399                    // Yield it as an event
1400                    accumulated_content = cached_response.content.clone();
1401                    final_provider_metadata = cached_response.provider_metadata.clone();
1402                    normalize_option_content(&mut accumulated_content);
1403                    if let Some(content) = accumulated_content.take() {
1404                        let has_function_calls = content
1405                            .parts
1406                            .iter()
1407                            .any(|part| matches!(part, Part::FunctionCall { .. }));
1408                        let content = if has_function_calls {
1409                            content
1410                        } else {
1411                            Self::apply_output_guardrails(output_guardrails.as_ref(), content).await?
1412                        };
1413                        accumulated_content = Some(content);
1414                    }
1415
1416                    let mut cached_event = Event::new(&invocation_id);
1417                    cached_event.author = agent_name.clone();
1418                    cached_event.llm_response.content = accumulated_content.clone();
1419                    cached_event.llm_response.provider_metadata = cached_response.provider_metadata.clone();
1420                    cached_event.llm_request = Some(serde_json::to_string(&request).unwrap_or_default());
1421                    cached_event.provider_metadata.insert("gcp.vertex.agent.llm_request".to_string(), serde_json::to_string(&request).unwrap_or_default());
1422                    cached_event.provider_metadata.insert("gcp.vertex.agent.llm_response".to_string(), serde_json::to_string(&cached_response).unwrap_or_default());
1423
1424                    // Populate long_running_tool_ids for function calls from long-running tools
1425                    if let Some(ref content) = accumulated_content {
1426                        cached_event.long_running_tool_ids = collect_long_running_ids(content);
1427                    }
1428
1429                    yield Ok(cached_event);
1430                } else {
1431                    // Record LLM request for tracing
1432                    let request_json = serde_json::to_string(&request).unwrap_or_default();
1433                    let trace_request_json = trace_json_payload(
1434                        &request,
1435                        ctx.run_config().record_payloads,
1436                        ctx.run_config().trace_payload_max_bytes,
1437                    );
1438
1439                    // Create call_llm span with GCP attributes (works for all model types)
1440                    let llm_ts = std::time::SystemTime::now()
1441                        .duration_since(std::time::UNIX_EPOCH)
1442                        .unwrap_or_default()
1443                        .as_nanos();
1444                    let llm_event_id = format!("{}_llm_{}", invocation_id, llm_ts);
1445                    let llm_span = tracing::info_span!(
1446                        "call_llm",
1447                        "gcp.vertex.agent.event_id" = %llm_event_id,
1448                        "gcp.vertex.agent.invocation_id" = %invocation_id,
1449                        "gcp.vertex.agent.session_id" = %ctx.session_id(),
1450                        "gen_ai.conversation.id" = %ctx.session_id(),
1451                        "gcp.vertex.agent.llm_request" = %trace_request_json,
1452                        "gcp.vertex.agent.llm_response" = tracing::field::Empty  // Placeholder for later recording
1453                    );
1454                    let _llm_guard = llm_span.enter();
1455
1456                    // Check streaming mode from run config
1457                    use adk_core::StreamingMode;
1458                    let streaming_mode = ctx.run_config().streaming_mode;
1459                    let should_stream_to_client = matches!(streaming_mode, StreamingMode::SSE | StreamingMode::Bidi)
1460                        && output_guardrails.is_empty();
1461
1462                    // Always use streaming internally for LLM calls
1463                    let mut response_stream = model.generate_content(request, true).await?;
1464
1465                    use futures::StreamExt;
1466
1467                    // Track last chunk for final event metadata (used in None mode)
1468                    let mut last_chunk: Option<LlmResponse> = None;
1469
1470                    // Stream and process chunks with AfterModel callbacks
1471                    while let Some(chunk_result) = response_stream.next().await {
1472                        let mut chunk = match chunk_result {
1473                            Ok(c) => c,
1474                            Err(e) => {
1475                                yield Err(e);
1476                                return;
1477                            }
1478                        };
1479
1480                        // ===== AFTER MODEL CALLBACKS (per chunk) =====
1481                        // Callbacks can modify each streaming chunk
1482                        for callback in after_model_callbacks.as_ref() {
1483                            match callback(ctx.clone() as Arc<dyn CallbackContext>, chunk.clone()).await {
1484                                Ok(Some(modified_chunk)) => {
1485                                    // Callback modified this chunk
1486                                    chunk = modified_chunk;
1487                                    break;
1488                                }
1489                                Ok(None) => {
1490                                    // Continue to next callback
1491                                    continue;
1492                                }
1493                                Err(e) => {
1494                                    // Callback failed - propagate error
1495                                    yield Err(e);
1496                                    return;
1497                                }
1498                            }
1499                        }
1500
1501                        normalize_option_content(&mut chunk.content);
1502
1503                        // Accumulate content for conversation history (always needed)
1504                        if let Some(chunk_content) = chunk.content.clone() {
1505                            if let Some(ref mut acc) = accumulated_content {
1506                                acc.parts.extend(chunk_content.parts);
1507                            } else {
1508                                accumulated_content = Some(chunk_content);
1509                            }
1510                        }
1511
1512                        // For SSE/Bidi mode: yield each chunk immediately with stable event ID
1513                        if should_stream_to_client {
1514                            let mut partial_event = Event::with_id(&llm_event_id, &invocation_id);
1515                            partial_event.author = agent_name.clone();
1516                            partial_event.llm_request = Some(request_json.clone());
1517                            partial_event.provider_metadata.insert("gcp.vertex.agent.llm_request".to_string(), request_json.clone());
1518                            partial_event.provider_metadata.insert("gcp.vertex.agent.llm_response".to_string(), serde_json::to_string(&chunk).unwrap_or_default());
1519                            partial_event.llm_response.partial = chunk.partial;
1520                            partial_event.llm_response.turn_complete = chunk.turn_complete;
1521                            partial_event.llm_response.finish_reason = chunk.finish_reason;
1522                            partial_event.llm_response.usage_metadata = chunk.usage_metadata.clone();
1523                            partial_event.llm_response.content = chunk.content.clone();
1524                            partial_event.llm_response.provider_metadata = chunk.provider_metadata.clone();
1525
1526                            // Populate long_running_tool_ids
1527                            if let Some(ref content) = chunk.content {
1528                                partial_event.long_running_tool_ids = collect_long_running_ids(content);
1529                            }
1530
1531                            yield Ok(partial_event);
1532                        }
1533
1534                        // Store last chunk for final event metadata
1535                        last_chunk = Some(chunk.clone());
1536
1537                        // Check if turn is complete
1538                        if chunk.turn_complete {
1539                            break;
1540                        }
1541                    }
1542
1543                    // For None mode: yield single final event with accumulated content
1544                    if !should_stream_to_client {
1545                        if let Some(content) = accumulated_content.take() {
1546                            let has_function_calls = content
1547                                .parts
1548                                .iter()
1549                                .any(|part| matches!(part, Part::FunctionCall { .. }));
1550                            let content = if has_function_calls {
1551                                content
1552                            } else {
1553                                Self::apply_output_guardrails(output_guardrails.as_ref(), content).await?
1554                            };
1555                            accumulated_content = Some(content);
1556                        }
1557
1558                        let mut final_event = Event::with_id(&llm_event_id, &invocation_id);
1559                        final_event.author = agent_name.clone();
1560                        final_event.llm_request = Some(request_json.clone());
1561                        final_event.provider_metadata.insert("gcp.vertex.agent.llm_request".to_string(), request_json.clone());
1562                        final_event.llm_response.content = accumulated_content.clone();
1563                        final_event.llm_response.partial = false;
1564                        final_event.llm_response.turn_complete = true;
1565
1566                        // Copy metadata from last chunk
1567                        if let Some(ref last) = last_chunk {
1568                            final_event.llm_response.finish_reason = last.finish_reason;
1569                            final_event.llm_response.usage_metadata = last.usage_metadata.clone();
1570                            final_event.llm_response.provider_metadata = last.provider_metadata.clone();
1571                            final_provider_metadata = last.provider_metadata.clone();
1572                            final_event.provider_metadata.insert("gcp.vertex.agent.llm_response".to_string(), serde_json::to_string(last).unwrap_or_default());
1573                        }
1574
1575                        // Populate long_running_tool_ids
1576                        if let Some(ref content) = accumulated_content {
1577                            final_event.long_running_tool_ids = collect_long_running_ids(content);
1578                        }
1579
1580                        yield Ok(final_event);
1581                    }
1582
1583                    // Record LLM response to span before guard drops
1584                    if let Some(ref content) = accumulated_content {
1585                        let response_json = trace_json_payload(
1586                            content,
1587                            ctx.run_config().record_payloads,
1588                            ctx.run_config().trace_payload_max_bytes,
1589                        );
1590                        llm_span.record("gcp.vertex.agent.llm_response", &response_json);
1591                    }
1592                }
1593
1594                // ===== ENHANCED PLUGIN: AFTER MODEL CALL =====
1595                // Enhanced plugins can modify the accumulated model response.
1596                // They run after the full response is accumulated (not per-chunk).
1597                #[cfg(feature = "enhanced-plugins")]
1598                if let Some(epm) = &enhanced_plugin_manager {
1599                    if let Some(ref content) = accumulated_content {
1600                        let response_for_hook = LlmResponse {
1601                            content: Some(content.clone()),
1602                            provider_metadata: final_provider_metadata.clone(),
1603                            ..Default::default()
1604                        };
1605                        match epm.run_after_model_call(response_for_hook, ctx.clone() as Arc<dyn CallbackContext>).await {
1606                            Ok(adk_plugin::AfterModelCallResult::Continue(modified_response)) => {
1607                                accumulated_content = modified_response.content;
1608                                if modified_response.provider_metadata.is_some() {
1609                                    final_provider_metadata = modified_response.provider_metadata;
1610                                }
1611                            }
1612                            Err(e) => {
1613                                yield Err(e);
1614                                return;
1615                            }
1616                        }
1617                    }
1618                }
1619
1620                // After streaming/caching completes, check for function calls in accumulated content
1621                let function_call_names: Vec<String> = accumulated_content.as_ref()
1622                    .map(|c| c.parts.iter()
1623                        .filter_map(|p| {
1624                            if let Part::FunctionCall { name, .. } = p {
1625                                Some(name.clone())
1626                            } else {
1627                                None
1628                            }
1629                        })
1630                        .collect())
1631                    .unwrap_or_default();
1632
1633                let has_function_calls = !function_call_names.is_empty();
1634
1635                // Check if ALL function calls are from long-running tools
1636                // If so, we should NOT continue the loop - the tool returned a pending status
1637                // and the agent/client will poll for completion later
1638                let all_calls_are_long_running = has_function_calls && function_call_names.iter().all(|name| {
1639                    tool_map.get(name)
1640                        .map(|t| t.is_long_running())
1641                        .unwrap_or(false)
1642                });
1643
1644                // Add final content to history
1645                if let Some(ref content) = accumulated_content {
1646                    conversation_history.push(Self::augment_content_for_history(
1647                        content,
1648                        final_provider_metadata.as_ref(),
1649                    ));
1650
1651                    // Handle output_key: save final agent output to state_delta
1652                    if let Some(ref output_key) = output_key {
1653                        if !has_function_calls {  // Only save if not calling tools
1654                            let mut text_parts = String::new();
1655                            for part in &content.parts {
1656                                if let Part::Text { text } = part {
1657                                    text_parts.push_str(text);
1658                                }
1659                            }
1660                            if !text_parts.is_empty() {
1661                                // Yield a final state update event
1662                                let mut state_event = Event::new(&invocation_id);
1663                                state_event.author = agent_name.clone();
1664                                state_event.actions.state_delta.insert(
1665                                    output_key.clone(),
1666                                    serde_json::Value::String(text_parts),
1667                                );
1668                                yield Ok(state_event);
1669                            }
1670                        }
1671                    }
1672                }
1673
1674                if !has_function_calls {
1675                    // No function calls, we're done
1676                    // Record LLM response for tracing
1677                    if let Some(ref content) = accumulated_content {
1678                        let response_json = trace_json_payload(
1679                            content,
1680                            ctx.run_config().record_payloads,
1681                            ctx.run_config().trace_payload_max_bytes,
1682                        );
1683                        tracing::Span::current().record("gcp.vertex.agent.llm_response", &response_json);
1684                    }
1685
1686                    tracing::info!(agent.name = %agent_name, "Agent execution complete");
1687                    break;
1688                }
1689
1690                // Execute function calls and add responses to history
1691                if let Some(content) = &accumulated_content {
1692                    // ===== RESOLVE TOOL EXECUTION STRATEGY =====
1693                    // Per-agent override; defaults to Sequential if not set.
1694                    let strategy = agent_tool_execution_strategy
1695                        .unwrap_or(ToolExecutionStrategy::Sequential);
1696
1697                    // Collect function call parts with original indices for
1698                    // order-preserving reassembly in parallel/auto modes.
1699                    // Tuple: (index, name, args, id, function_call_id)
1700                    let mut fc_parts: Vec<(usize, String, serde_json::Value, Option<String>, String)> = Vec::new();
1701                    {
1702                        let mut tci = 0usize;
1703                        for part in &content.parts {
1704                            if let Part::FunctionCall { name, args, id, .. } = part {
1705                                let fallback = format!("{}_{}_{}", invocation_id, name, tci);
1706                                let fcid = id.clone().unwrap_or(fallback);
1707                                fc_parts.push((tci, name.clone(), args.clone(), id.clone(), fcid));
1708                                tci += 1;
1709                            }
1710                        }
1711                    }
1712
1713                    // ===== HANDLE transfer_to_agent BEFORE DISPATCH =====
1714                    // Transfer calls cause an immediate return from the stream,
1715                    // so they must be handled inline regardless of strategy.
1716                    let mut transfer_handled = false;
1717                    for (_, fc_name, fc_args, fc_id, _) in &fc_parts {
1718                        if fc_name == "transfer_to_agent" {
1719                            let target_agent = fc_args.get("agent_name")
1720                                .and_then(|v| v.as_str())
1721                                .unwrap_or_default()
1722                                .to_string();
1723
1724                            let valid_target = valid_transfer_targets.iter().any(|n| n == &target_agent);
1725                            if !valid_target {
1726                                let error_content = Content {
1727                                    role: "function".to_string(),
1728                                    parts: vec![Part::FunctionResponse {
1729                                        function_response: FunctionResponseData::new(
1730                                            fc_name.clone(),
1731                                            serde_json::json!({
1732                                                "error": format!(
1733                                                    "Agent '{}' not found. Available agents: {:?}",
1734                                                    target_agent, valid_transfer_targets
1735                                                )
1736                                            }),
1737                                        ),
1738                                        id: fc_id.clone(),
1739                                    }],
1740                                };
1741                                conversation_history.push(error_content.clone());
1742                                let mut error_event = Event::new(&invocation_id);
1743                                error_event.author = agent_name.clone();
1744                                error_event.llm_response.content = Some(error_content);
1745                                yield Ok(error_event);
1746                                continue;
1747                            }
1748
1749                            let mut transfer_event = Event::new(&invocation_id);
1750                            transfer_event.author = agent_name.clone();
1751                            transfer_event.actions.transfer_to_agent = Some(target_agent);
1752                            yield Ok(transfer_event);
1753                            transfer_handled = true;
1754                            break;
1755                        }
1756                    }
1757                    if transfer_handled {
1758                        return;
1759                    }
1760
1761                    // Filter out transfer_to_agent and built-in tools
1762                    let fc_parts: Vec<_> = fc_parts.into_iter().filter(|(_, fc_name, _, _, _)| {
1763                        if fc_name == "transfer_to_agent" {
1764                            return false;
1765                        }
1766                        if let Some(tool) = tool_map.get(fc_name) {
1767                            if tool.is_builtin() {
1768                                adk_telemetry::debug!(tool.name = %fc_name, "skipping built-in tool execution");
1769                                return false;
1770                            }
1771                        }
1772                        true
1773                    }).collect();
1774
1775                    // ===== TOOL CONFIRMATION PRE-CHECK =====
1776                    // Tool confirmation interrupts cause an immediate return,
1777                    // so check before parallel dispatch.
1778                    let mut confirmation_interrupted = false;
1779                    for (_, fc_name, fc_args, _, fc_call_id) in &fc_parts {
1780                        if tool_confirmation_policy.requires_confirmation(fc_name)
1781                            && ctx.run_config().tool_confirmation_decisions.get(fc_name).copied().is_none()
1782                        {
1783                                let mut ce = Event::new(&invocation_id);
1784                                ce.author = agent_name.clone();
1785                                ce.llm_response.interrupted = true;
1786                                ce.llm_response.turn_complete = true;
1787                                ce.llm_response.content = Some(Content {
1788                                    role: "model".to_string(),
1789                                    parts: vec![Part::Text {
1790                                        text: format!(
1791                                            "Tool confirmation required for '{}'. Provide approve/deny decision to continue.",
1792                                            fc_name
1793                                        ),
1794                                    }],
1795                                });
1796                                ce.actions.tool_confirmation = Some(ToolConfirmationRequest {
1797                                    tool_name: fc_name.clone(),
1798                                    function_call_id: Some(fc_call_id.clone()),
1799                                    args: fc_args.clone(),
1800                                });
1801                                yield Ok(ce);
1802                                confirmation_interrupted = true;
1803                                break;
1804                        }
1805                    }
1806                    if confirmation_interrupted {
1807                        return;
1808                    }
1809
1810                    // Wrap circuit breaker in Mutex for shared access across parallel futures.
1811                    let cb_mutex = std::sync::Mutex::new(circuit_breaker_state.take());
1812
1813                    // Create concurrency manager for semaphore-based tool dispatch enforcement.
1814                    // Per-tool overrides take precedence over the global limit.
1815                    let concurrency_manager = adk_core::ToolConcurrencyManager::new(
1816                        &ctx.run_config().tool_concurrency,
1817                    );
1818
1819                    // Per-tool execution async block. Returns (index, Content, EventActions, escalate_or_skip).
1820                    // Each tool retains its own retry budget, circuit breaker, tracing span,
1821                    // before/after callbacks, and error handling. Errors are captured as
1822                    // { "error": "..." } JSON — failed tools do not abort the batch.
1823                    let execute_one_tool = |idx: usize, name: String, args: serde_json::Value,
1824                                            id: Option<String>, function_call_id: String| {
1825                        let ctx = ctx.clone();
1826                        let tool_map = &tool_map;
1827                        let tool_retry_budgets = &tool_retry_budgets;
1828                        let default_retry_budget = &default_retry_budget;
1829                        let before_tool_callbacks = &before_tool_callbacks;
1830                        let after_tool_callbacks = &after_tool_callbacks;
1831                        let after_tool_callbacks_full = &after_tool_callbacks_full;
1832                        let on_tool_error_callbacks = &on_tool_error_callbacks;
1833                        let tool_confirmation_policy = &tool_confirmation_policy;
1834                        let cb_mutex = &cb_mutex;
1835                        let invocation_id = &invocation_id;
1836                        let concurrency_manager = &concurrency_manager;
1837                        #[cfg(feature = "enhanced-plugins")]
1838                        let enhanced_plugin_manager = &enhanced_plugin_manager;
1839                        async move {
1840                            let mut tool_actions = EventActions::default();
1841                            let mut response_content: Option<Content> = None;
1842                            let mut run_after_tool_callbacks = true;
1843                            let mut tool_outcome_for_callback: Option<ToolOutcome> = None;
1844                            let mut executed_tool: Option<Arc<dyn Tool>> = None;
1845                            let mut executed_tool_response: Option<serde_json::Value> = None;
1846
1847                            // Acquire concurrency permit before tool execution.
1848                            // The permit is held for the entire duration of this tool call
1849                            // and released on drop when this async block completes.
1850                            let _concurrency_permit = match concurrency_manager.acquire(&name).await {
1851                                Ok(permit) => Some(permit),
1852                                Err(e) => {
1853                                    // Concurrency limit reached with Fail policy — return error
1854                                    let error_content = Content {
1855                                        role: "function".to_string(),
1856                                        parts: vec![Part::FunctionResponse {
1857                                            function_response: FunctionResponseData::new(
1858                                                name.clone(),
1859                                                serde_json::json!({ "error": e.to_string() }),
1860                                            ),
1861                                            id: id.clone(),
1862                                        }],
1863                                    };
1864                                    return (idx, error_content, tool_actions, false);
1865                                }
1866                            };
1867
1868                            // Tool confirmation (deny case; None handled by pre-check)
1869                            if tool_confirmation_policy.requires_confirmation(&name) {
1870                                match ctx.run_config().tool_confirmation_decisions.get(&name).copied() {
1871                                    Some(ToolConfirmationDecision::Approve) => {
1872                                        tool_actions.tool_confirmation_decision =
1873                                            Some(ToolConfirmationDecision::Approve);
1874                                    }
1875                                    Some(ToolConfirmationDecision::Deny) => {
1876                                        tool_actions.tool_confirmation_decision =
1877                                            Some(ToolConfirmationDecision::Deny);
1878                                        response_content = Some(Content {
1879                                            role: "function".to_string(),
1880                                            parts: vec![Part::FunctionResponse {
1881                                                function_response: FunctionResponseData::new(
1882                                                    name.clone(),
1883                                                    serde_json::json!({
1884                                                        "error": format!("Tool '{}' execution denied by confirmation policy", name)
1885                                                    }),
1886                                                ),
1887                                                id: id.clone(),
1888                                            }],
1889                                        });
1890                                        run_after_tool_callbacks = false;
1891                                    }
1892                                    None => {
1893                                        response_content = Some(Content {
1894                                            role: "function".to_string(),
1895                                            parts: vec![Part::FunctionResponse {
1896                                                function_response: FunctionResponseData::new(
1897                                                    name.clone(),
1898                                                    serde_json::json!({
1899                                                        "error": format!("Tool '{}' requires confirmation", name)
1900                                                    }),
1901                                                ),
1902                                                id: id.clone(),
1903                                            }],
1904                                        });
1905                                        run_after_tool_callbacks = false;
1906                                    }
1907                                }
1908                            }
1909
1910                            // Before-tool callbacks
1911                            // Track potentially modified args for enhanced plugin after-hook
1912                            #[allow(unused_mut)]
1913                            let mut final_args = args.clone();
1914
1915                            // ===== ENHANCED PLUGIN: BEFORE TOOL CALL =====
1916                            #[cfg(feature = "enhanced-plugins")]
1917                            if response_content.is_none() {
1918                                if let Some(epm) = &enhanced_plugin_manager {
1919                                    if let Some(tool_ref) = tool_map.get(&name) {
1920                                        match epm.run_before_tool_call(
1921                                            tool_ref.clone(),
1922                                            final_args.clone(),
1923                                            ctx.clone() as Arc<dyn CallbackContext>,
1924                                        ).await {
1925                                            Ok(BeforeToolCallResult::Continue(modified_args)) => {
1926                                                final_args = modified_args;
1927                                            }
1928                                            Ok(BeforeToolCallResult::ShortCircuit(synthetic_result)) => {
1929                                                // Short-circuit: use synthetic result, skip tool execution
1930                                                response_content = Some(Content {
1931                                                    role: "function".to_string(),
1932                                                    parts: vec![Part::FunctionResponse {
1933                                                        function_response: FunctionResponseData::from_tool_result(
1934                                                            name.clone(),
1935                                                            synthetic_result,
1936                                                        ),
1937                                                        id: id.clone(),
1938                                                    }],
1939                                                });
1940                                                executed_tool = Some(tool_ref.clone());
1941                                            }
1942                                            Err(e) => {
1943                                                response_content = Some(Content {
1944                                                    role: "function".to_string(),
1945                                                    parts: vec![Part::FunctionResponse {
1946                                                        function_response: FunctionResponseData::new(
1947                                                            name.clone(),
1948                                                            serde_json::json!({ "error": e.to_string() }),
1949                                                        ),
1950                                                        id: id.clone(),
1951                                                    }],
1952                                                });
1953                                                run_after_tool_callbacks = false;
1954                                            }
1955                                        }
1956                                    }
1957                                }
1958                            }
1959
1960                            if response_content.is_none() {
1961                                let tool_ctx = Arc::new(ToolCallbackContext::new(
1962                                    ctx.clone(),
1963                                    name.clone(),
1964                                    final_args.clone(),
1965                                ));
1966                                for callback in before_tool_callbacks.as_ref() {
1967                                    match callback(tool_ctx.clone() as Arc<dyn CallbackContext>).await {
1968                                        Ok(Some(c)) => { response_content = Some(c); break; }
1969                                        Ok(None) => continue,
1970                                        Err(e) => {
1971                                            response_content = Some(Content {
1972                                                role: "function".to_string(),
1973                                                parts: vec![Part::FunctionResponse {
1974                                                    function_response: FunctionResponseData::new(
1975                                                        name.clone(),
1976                                                        serde_json::json!({ "error": e.to_string() }),
1977                                                    ),
1978                                                    id: id.clone(),
1979                                                }],
1980                                            });
1981                                            run_after_tool_callbacks = false;
1982                                            break;
1983                                        }
1984                                    }
1985                                }
1986                            }
1987
1988                            // Circuit breaker check
1989                            if response_content.is_none() {
1990                                let guard = cb_mutex.lock().unwrap_or_else(|e| e.into_inner());
1991                                if let Some(ref cb_state) = *guard {
1992                                    if cb_state.is_open(&name) {
1993                                        let msg = format!(
1994                                            "Tool '{}' is temporarily disabled after {} consecutive failures",
1995                                            name, cb_state.threshold
1996                                        );
1997                                        tracing::warn!(tool.name = %name, "circuit breaker open, skipping tool execution");
1998                                        response_content = Some(Content {
1999                                            role: "function".to_string(),
2000                                            parts: vec![Part::FunctionResponse {
2001                                                function_response: FunctionResponseData::new(
2002                                                    name.clone(),
2003                                                    serde_json::json!({ "error": msg }),
2004                                                ),
2005                                                id: id.clone(),
2006                                            }],
2007                                        });
2008                                        run_after_tool_callbacks = false;
2009                                    }
2010                                }
2011                                drop(guard);
2012                            }
2013
2014                            // Execute tool with retry budget and tracing
2015                            if response_content.is_none() {
2016                                if let Some(tool) = tool_map.get(&name) {
2017                                    let tool_ctx: Arc<dyn ToolContext> = Arc::new(
2018                                        AgentToolContext::new(ctx.clone(), function_call_id.clone()),
2019                                    );
2020                                    let span_name = format!("execute_tool {name}");
2021                                    let tool_span = tracing::info_span!(
2022                                        "",
2023                                        otel.name = %span_name,
2024                                        tool.name = %name,
2025                                        "gcp.vertex.agent.event_id" = %format!("{}_{}", invocation_id, name),
2026                                        "gcp.vertex.agent.invocation_id" = %invocation_id,
2027                                        "gcp.vertex.agent.session_id" = %ctx.session_id(),
2028                                        "gen_ai.conversation.id" = %ctx.session_id()
2029                                    );
2030
2031                                    let budget = tool_retry_budgets.get(&name)
2032                                        .or(default_retry_budget.as_ref());
2033                                    let max_attempts = budget.map(|b| b.max_retries + 1).unwrap_or(1);
2034                                    let retry_delay = budget.map(|b| b.delay).unwrap_or_default();
2035
2036                                    let tool_clone = tool.clone();
2037                                    let tool_start = std::time::Instant::now();
2038                                    let mut last_error = String::new();
2039                                    let mut final_attempt: u32 = 0;
2040                                    let mut retry_result: Option<serde_json::Value> = None;
2041
2042                                    for attempt in 0..max_attempts {
2043                                        final_attempt = attempt;
2044                                        if attempt > 0 {
2045                                            tokio::time::sleep(retry_delay).await;
2046                                        }
2047                                        match async {
2048                                            let args_payload = trace_json_payload(
2049                                                &final_args,
2050                                                ctx.run_config().record_payloads,
2051                                                ctx.run_config().trace_payload_max_bytes,
2052                                            );
2053                                            tracing::debug!(tool.name = %name, tool.args = %args_payload, attempt = attempt, "tool_call");
2054                                            let exec_future = tool_clone.execute(tool_ctx.clone(), final_args.clone());
2055                                            tokio::time::timeout(tool_timeout, exec_future).await
2056                                        }.instrument(tool_span.clone()).await {
2057                                            Ok(Ok(value)) => {
2058                                                let result_payload = trace_json_payload(
2059                                                    &value,
2060                                                    ctx.run_config().record_payloads,
2061                                                    ctx.run_config().trace_payload_max_bytes,
2062                                                );
2063                                                tracing::debug!(tool.name = %name, tool.result = %result_payload, "tool_result");
2064                                                retry_result = Some(value);
2065                                                break;
2066                                            }
2067                                            Ok(Err(e)) => {
2068                                                last_error = e.to_string();
2069                                                if attempt + 1 < max_attempts {
2070                                                    tracing::warn!(tool.name = %name, attempt = attempt, error = %last_error, "tool execution failed, retrying");
2071                                                } else {
2072                                                    tracing::warn!(tool.name = %name, error = %last_error, "tool_error");
2073                                                }
2074                                            }
2075                                            Err(_) => {
2076                                                last_error = format!(
2077                                                    "Tool '{}' timed out after {} seconds",
2078                                                    name, tool_timeout.as_secs()
2079                                                );
2080                                                if attempt + 1 < max_attempts {
2081                                                    tracing::warn!(tool.name = %name, attempt = attempt, timeout_secs = tool_timeout.as_secs(), "tool timed out, retrying");
2082                                                } else {
2083                                                    tracing::warn!(tool.name = %name, timeout_secs = tool_timeout.as_secs(), "tool_timeout");
2084                                                }
2085                                            }
2086                                        }
2087                                    }
2088
2089                                    let tool_duration = tool_start.elapsed();
2090                                    let (tool_success, tool_error_message, function_response) = match retry_result {
2091                                        Some(value) => (true, None, value),
2092                                        None => (false, Some(last_error.clone()), serde_json::json!({ "error": last_error })),
2093                                    };
2094
2095                                    let outcome = ToolOutcome {
2096                                        tool_name: name.clone(),
2097                                        tool_args: final_args.clone(),
2098                                        success: tool_success,
2099                                        duration: tool_duration,
2100                                        error_message: tool_error_message.clone(),
2101                                        attempt: final_attempt,
2102                                    };
2103                                    tool_outcome_for_callback = Some(outcome);
2104
2105                                    // Circuit breaker recording
2106                                    {
2107                                        let mut guard = cb_mutex.lock().unwrap_or_else(|e| e.into_inner());
2108                                        if let Some(ref mut cb_state) = *guard {
2109                                            cb_state.record(tool_outcome_for_callback.as_ref().unwrap());
2110                                        }
2111                                    }
2112
2113                                    // On-tool-error callbacks
2114                                    let final_function_response = if !tool_success {
2115                                        let mut fallback_result = None;
2116                                        let error_msg = tool_error_message.clone().unwrap_or_default();
2117                                        for callback in on_tool_error_callbacks.as_ref() {
2118                                            match callback(
2119                                                ctx.clone() as Arc<dyn CallbackContext>,
2120                                                tool.clone(),
2121                                                final_args.clone(),
2122                                                error_msg.clone(),
2123                                            ).await {
2124                                                Ok(Some(result)) => { fallback_result = Some(result); break; }
2125                                                Ok(None) => continue,
2126                                                Err(e) => { tracing::warn!(error = %e, "on_tool_error callback failed"); break; }
2127                                            }
2128                                        }
2129                                        fallback_result.unwrap_or(function_response)
2130                                    } else {
2131                                        function_response
2132                                    };
2133
2134                                    let confirmation_decision = tool_actions.tool_confirmation_decision;
2135                                    tool_actions = tool_ctx.actions();
2136                                    if tool_actions.tool_confirmation_decision.is_none() {
2137                                        tool_actions.tool_confirmation_decision = confirmation_decision;
2138                                    }
2139                                    executed_tool = Some(tool.clone());
2140                                    executed_tool_response = Some(final_function_response.clone());
2141                                    response_content = Some(Content {
2142                                        role: "function".to_string(),
2143                                        parts: vec![Part::FunctionResponse {
2144                                            function_response: FunctionResponseData::from_tool_result(
2145                                                name.clone(),
2146                                                final_function_response,
2147                                            ),
2148                                            id: id.clone(),
2149                                        }],
2150                                    });
2151                                } else {
2152                                    response_content = Some(Content {
2153                                        role: "function".to_string(),
2154                                        parts: vec![Part::FunctionResponse {
2155                                            function_response: FunctionResponseData::new(
2156                                                name.clone(),
2157                                                serde_json::json!({
2158                                                    "error": format!("Tool {} not found", name)
2159                                                }),
2160                                            ),
2161                                            id: id.clone(),
2162                                        }],
2163                                    });
2164                                }
2165                            }
2166
2167                            // After-tool callbacks
2168                            let mut response_content = response_content.expect("tool response content is set");
2169                            if run_after_tool_callbacks {
2170                                let outcome_ctx: Arc<dyn CallbackContext> = match tool_outcome_for_callback {
2171                                    Some(outcome) => Arc::new(ToolOutcomeCallbackContext {
2172                                        inner: ctx.clone() as Arc<dyn CallbackContext>,
2173                                        outcome,
2174                                    }),
2175                                    None => ctx.clone() as Arc<dyn CallbackContext>,
2176                                };
2177                                let cb_ctx: Arc<dyn CallbackContext> = Arc::new(ToolCallbackContext::new(
2178                                    outcome_ctx,
2179                                    name.clone(),
2180                                    final_args.clone(),
2181                                ));
2182                                for callback in after_tool_callbacks.as_ref() {
2183                                    match callback(cb_ctx.clone()).await {
2184                                        Ok(Some(modified)) => { response_content = modified; break; }
2185                                        Ok(None) => continue,
2186                                        Err(e) => {
2187                                            response_content = Content {
2188                                                role: "function".to_string(),
2189                                                parts: vec![Part::FunctionResponse {
2190                                                    function_response: FunctionResponseData::new(
2191                                                        name.clone(),
2192                                                        serde_json::json!({ "error": e.to_string() }),
2193                                                    ),
2194                                                    id: id.clone(),
2195                                                }],
2196                                            };
2197                                            break;
2198                                        }
2199                                    }
2200                                }
2201                                if let (Some(tool_ref), Some(tool_resp)) = (&executed_tool, executed_tool_response) {
2202                                    for callback in after_tool_callbacks_full.as_ref() {
2203                                        match callback(
2204                                            cb_ctx.clone(), tool_ref.clone(), final_args.clone(), tool_resp.clone(),
2205                                        ).await {
2206                                            Ok(Some(modified_value)) => {
2207                                                response_content = Content {
2208                                                    role: "function".to_string(),
2209                                                    parts: vec![Part::FunctionResponse {
2210                                                        function_response: FunctionResponseData::from_tool_result(
2211                                                            name.clone(),
2212                                                            modified_value,
2213                                                        ),
2214                                                        id: id.clone(),
2215                                                    }],
2216                                                };
2217                                                break;
2218                                            }
2219                                            Ok(None) => continue,
2220                                            Err(e) => {
2221                                                response_content = Content {
2222                                                    role: "function".to_string(),
2223                                                    parts: vec![Part::FunctionResponse {
2224                                                        function_response: FunctionResponseData::new(
2225                                                            name.clone(),
2226                                                            serde_json::json!({ "error": e.to_string() }),
2227                                                        ),
2228                                                        id: id.clone(),
2229                                                    }],
2230                                                };
2231                                                break;
2232                                            }
2233                                        }
2234                                    }
2235                                }
2236
2237                                // ===== ENHANCED PLUGIN: AFTER TOOL CALL =====
2238                                // Enhanced plugins can modify the tool result after legacy callbacks.
2239                                #[cfg(feature = "enhanced-plugins")]
2240                                if let Some(epm) = &enhanced_plugin_manager {
2241                                    if let Some(tool_ref) = &executed_tool {
2242                                        // Extract the result value from the response content
2243                                        let result_value = response_content.parts.iter()
2244                                            .find_map(|p| {
2245                                                if let Part::FunctionResponse { function_response, .. } = p {
2246                                                    Some(function_response.response.clone())
2247                                                } else {
2248                                                    None
2249                                                }
2250                                            })
2251                                            .unwrap_or(serde_json::json!(null));
2252
2253                                        match epm.run_after_tool_call(
2254                                            tool_ref.clone(),
2255                                            &final_args,
2256                                            result_value,
2257                                            ctx.clone() as Arc<dyn CallbackContext>,
2258                                        ).await {
2259                                            Ok(adk_plugin::AfterToolCallResult::Continue(modified_result)) => {
2260                                                response_content = Content {
2261                                                    role: "function".to_string(),
2262                                                    parts: vec![Part::FunctionResponse {
2263                                                        function_response: FunctionResponseData::from_tool_result(
2264                                                            name.clone(),
2265                                                            modified_result,
2266                                                        ),
2267                                                        id: id.clone(),
2268                                                    }],
2269                                                };
2270                                            }
2271                                            Err(e) => {
2272                                                response_content = Content {
2273                                                    role: "function".to_string(),
2274                                                    parts: vec![Part::FunctionResponse {
2275                                                        function_response: FunctionResponseData::new(
2276                                                            name.clone(),
2277                                                            serde_json::json!({ "error": e.to_string() }),
2278                                                        ),
2279                                                        id: id.clone(),
2280                                                    }],
2281                                                };
2282                                            }
2283                                        }
2284                                    }
2285                                }
2286                            }
2287
2288                            let escalate_or_skip = tool_actions.escalate || tool_actions.skip_summarization;
2289                            (idx, response_content, tool_actions, escalate_or_skip)
2290                        }
2291                    };
2292
2293                    // ===== DISPATCH BASED ON STRATEGY =====
2294                    let mut results: Vec<(usize, Content, EventActions, bool)> = match strategy {
2295                        ToolExecutionStrategy::Sequential => {
2296                            let mut results = Vec::with_capacity(fc_parts.len());
2297                            for (idx, name, args, id, fcid) in fc_parts {
2298                                results.push(execute_one_tool(idx, name, args, id, fcid).await);
2299                            }
2300                            results
2301                        }
2302                        ToolExecutionStrategy::Parallel => {
2303                            use futures::StreamExt as _;
2304                            // All concurrency enforcement is handled by the
2305                            // ToolConcurrencyManager semaphore inside execute_one_tool.
2306                            // Use fc_parts.len() as buffer so all futures can start
2307                            // and queue on the semaphore for proper per-tool limiting.
2308                            let buffer_size = fc_parts.len().max(1);
2309                            futures::stream::iter(fc_parts.into_iter().map(
2310                                |(idx, name, args, id, fcid)| {
2311                                    execute_one_tool(idx, name, args, id, fcid)
2312                                },
2313                            ))
2314                            .buffer_unordered(buffer_size)
2315                            .collect()
2316                            .await
2317                        }
2318                        ToolExecutionStrategy::Auto => {
2319                            // Partition by is_read_only()
2320                            let mut read_only_fcs = Vec::new();
2321                            let mut mutable_fcs = Vec::new();
2322                            for fc in fc_parts {
2323                                let is_ro = tool_map.get(&fc.1)
2324                                    .map(|t| t.is_read_only())
2325                                    .unwrap_or(false);
2326                                if is_ro { read_only_fcs.push(fc); } else { mutable_fcs.push(fc); }
2327                            }
2328                            let mut all_results = Vec::new();
2329                            // Execute read-only tools concurrently first
2330                            // Concurrency enforcement is handled by the semaphore
2331                            // inside execute_one_tool.
2332                            if !read_only_fcs.is_empty() {
2333                                use futures::StreamExt as _;
2334                                let buffer_size = read_only_fcs.len().max(1);
2335                                all_results.extend(
2336                                    futures::stream::iter(read_only_fcs.into_iter().map(
2337                                        |(idx, name, args, id, fcid)| {
2338                                            execute_one_tool(idx, name, args, id, fcid)
2339                                        },
2340                                    ))
2341                                    .buffer_unordered(buffer_size)
2342                                    .collect::<Vec<_>>()
2343                                    .await,
2344                                );
2345                            }
2346                            // Then execute mutable tools sequentially
2347                            for (idx, name, args, id, fcid) in mutable_fcs {
2348                                all_results.push(execute_one_tool(idx, name, args, id, fcid).await);
2349                            }
2350                            all_results
2351                        }
2352                    };
2353                    // Preserve LLM-returned order even when tool futures finish out of order.
2354                    results.sort_by_key(|r| r.0);
2355
2356                    // Restore circuit breaker state from the mutex
2357                    circuit_breaker_state = cb_mutex.into_inner().unwrap_or_else(|e| e.into_inner());
2358
2359                    // Yield results in original order
2360                    for (_, response_content, tool_actions, escalate_or_skip) in results {
2361                        let mut tool_event = Event::new(&invocation_id);
2362                        tool_event.author = agent_name.clone();
2363                        tool_event.actions = tool_actions;
2364                        tool_event.llm_response.content = Some(response_content.clone());
2365                        yield Ok(tool_event);
2366
2367                        if escalate_or_skip {
2368                            return;
2369                        }
2370
2371                        conversation_history.push(response_content);
2372                    }
2373                }
2374
2375                // If all function calls were from long-running tools, we need ONE more model call
2376                // to let the model generate a user-friendly response about the pending task
2377                // But we mark this as the final iteration to prevent infinite loops
2378                if all_calls_are_long_running {
2379                    // Continue to next iteration for model to respond, but this will be the last
2380                    // The model will see the tool response and generate text like "Started task X..."
2381                    // On next iteration, there won't be function calls, so we'll break naturally
2382                }
2383            }
2384
2385            // ===== AFTER AGENT CALLBACKS =====
2386            // Execute after the agent completes
2387            for callback in after_agent_callbacks.as_ref() {
2388                match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
2389                    Ok(Some(content)) => {
2390                        // Callback returned content - yield it
2391                        let mut after_event = Event::new(&invocation_id);
2392                        after_event.author = agent_name.clone();
2393                        after_event.llm_response.content = Some(content);
2394                        yield Ok(after_event);
2395                        break; // First callback that returns content wins
2396                    }
2397                    Ok(None) => {
2398                        // Continue to next callback
2399                        continue;
2400                    }
2401                    Err(e) => {
2402                        // Callback failed - propagate error
2403                        yield Err(e);
2404                        return;
2405                    }
2406                }
2407            }
2408        };
2409
2410        Ok(Box::pin(s))
2411    }
2412}