Skip to main content

adk_agent/
llm_agent.rs

1use adk_core::{
2    AfterAgentCallback, AfterModelCallback, AfterToolCallback, AfterToolCallbackFull, Agent,
3    BeforeAgentCallback, BeforeModelCallback, BeforeModelResult, BeforeToolCallback,
4    CallbackContext, Content, Event, EventActions, FunctionResponseData, GlobalInstructionProvider,
5    InstructionProvider, InvocationContext, Llm, LlmRequest, LlmResponse, MemoryEntry,
6    OnToolErrorCallback, Part, ReadonlyContext, Result, RetryBudget, Tool, ToolCallbackContext,
7    ToolConfirmationDecision, ToolConfirmationPolicy, ToolConfirmationRequest, ToolContext,
8    ToolExecutionStrategy, ToolOutcome, Toolset,
9};
10use async_stream::stream;
11use async_trait::async_trait;
12use std::sync::{Arc, Mutex};
13use tracing::Instrument;
14
15#[cfg(feature = "enhanced-plugins")]
16use adk_plugin::{
17    BeforeModelCallResult, BeforeToolCallResult, EnhancedPlugin, EnhancedPluginManager,
18};
19
20#[cfg(feature = "skills")]
21use crate::skill_shim::load_skill_index;
22use crate::{
23    guardrails::{GuardrailSet, enforce_guardrails},
24    skill_shim::{SelectionPolicy, SkillIndex, select_skill_prompt_block},
25    tool_call_markup::normalize_option_content,
26    workflow::with_user_content_override,
27};
28
29/// Default maximum number of LLM round-trips (iterations) before the agent stops.
30pub const DEFAULT_MAX_ITERATIONS: u32 = 100;
31
32/// Default tool execution timeout (5 minutes).
33pub const DEFAULT_TOOL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300);
34
35fn trace_json_payload<T: serde::Serialize>(
36    value: &T,
37    record_payloads: bool,
38    max_bytes: usize,
39) -> String {
40    let json = serde_json::to_string(value).unwrap_or_default();
41    if cfg!(feature = "record-payloads") && record_payloads {
42        return json;
43    }
44
45    let max_bytes = max_bytes.max(32);
46    if json.len() <= max_bytes {
47        return json;
48    }
49
50    let mut end = max_bytes;
51    while !json.is_char_boundary(end) {
52        end -= 1;
53    }
54    format!("{}...[truncated {} bytes]", &json[..end], json.len() - end)
55}
56
57/// An LLM-powered agent that orchestrates tool calls and sub-agent delegation.
58///
59/// `LlmAgent` is the primary agent type in ADK. It sends requests to an LLM,
60/// executes tool calls from the response, and iterates until the model produces
61/// a final text response or the iteration limit is reached.
62///
63/// Use [`LlmAgentBuilder`] (via `LlmAgent::builder()`) to construct instances.
64pub struct LlmAgent {
65    name: String,
66    description: String,
67    model: Arc<dyn Llm>,
68    instruction: Option<String>,
69    instruction_provider: Option<Arc<InstructionProvider>>,
70    global_instruction: Option<String>,
71    global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
72    skills_index: Option<Arc<SkillIndex>>,
73    skill_policy: SelectionPolicy,
74    max_skill_chars: usize,
75    #[allow(dead_code)] // Part of public API via builder
76    input_schema: Option<serde_json::Value>,
77    output_schema: Option<serde_json::Value>,
78    /// Maximum retry attempts for output schema validation (default: 3).
79    output_max_retries: usize,
80    disallow_transfer_to_parent: bool,
81    disallow_transfer_to_peers: bool,
82    include_contents: adk_core::IncludeContents,
83    tools: Vec<Arc<dyn Tool>>,
84    toolsets: Vec<Arc<dyn Toolset>>,
85    sub_agents: Vec<Arc<dyn Agent>>,
86    output_key: Option<String>,
87    /// Default generation config (temperature, top_p, etc.) applied to every LLM request.
88    generate_content_config: Option<adk_core::GenerateContentConfig>,
89    /// Maximum number of LLM round-trips before stopping
90    max_iterations: u32,
91    /// Timeout for individual tool executions
92    tool_timeout: std::time::Duration,
93    before_callbacks: Arc<Vec<BeforeAgentCallback>>,
94    after_callbacks: Arc<Vec<AfterAgentCallback>>,
95    before_model_callbacks: Arc<Vec<BeforeModelCallback>>,
96    after_model_callbacks: Arc<Vec<AfterModelCallback>>,
97    before_tool_callbacks: Arc<Vec<BeforeToolCallback>>,
98    after_tool_callbacks: Arc<Vec<AfterToolCallback>>,
99    on_tool_error_callbacks: Arc<Vec<OnToolErrorCallback>>,
100    /// Rich after-tool callbacks that receive tool, args, and response.
101    after_tool_callbacks_full: Arc<Vec<AfterToolCallbackFull>>,
102    /// Default retry budget applied to all tools without a per-tool override.
103    default_retry_budget: Option<RetryBudget>,
104    /// Per-tool retry budget overrides, keyed by tool name.
105    tool_retry_budgets: std::collections::HashMap<String, RetryBudget>,
106    /// Circuit breaker failure threshold. When set, tools are temporarily disabled
107    /// after this many consecutive failures within a single invocation.
108    circuit_breaker_threshold: Option<u32>,
109    tool_confirmation_policy: ToolConfirmationPolicy,
110    /// Per-agent tool execution strategy override. When `Some`, overrides the
111    /// `RunConfig` strategy for this agent's dispatch loop.
112    tool_execution_strategy: Option<ToolExecutionStrategy>,
113    input_guardrails: Arc<GuardrailSet>,
114    output_guardrails: Arc<GuardrailSet>,
115    /// Enhanced plugin manager for fine-grained tool/model call interception.
116    /// Only created when enhanced plugins are registered (zero overhead otherwise).
117    #[cfg(feature = "enhanced-plugins")]
118    enhanced_plugin_manager: Option<Arc<EnhancedPluginManager>>,
119    /// Optional sandbox configuration for workspace lifecycle management.
120    /// When present, the SandboxRunner uses this to provision and bind tools.
121    /// The config does NOT add tools directly — that is SandboxRunner's responsibility.
122    #[cfg(feature = "sandbox")]
123    sandbox_config: Option<adk_sandbox::workspace::SandboxConfig>,
124}
125
126impl std::fmt::Debug for LlmAgent {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        f.debug_struct("LlmAgent")
129            .field("name", &self.name)
130            .field("description", &self.description)
131            .field("model", &self.model.name())
132            .field("instruction", &self.instruction)
133            .field("tools_count", &self.tools.len())
134            .field("sub_agents_count", &self.sub_agents.len())
135            .finish()
136    }
137}
138
139impl LlmAgent {
140    /// Returns the sandbox configuration attached to this agent, if any.
141    ///
142    /// The `SandboxRunner` uses this to provision a workspace and bind tools.
143    /// Returns `None` when no sandbox config was set on the builder.
144    ///
145    /// Requires the `sandbox` feature.
146    #[cfg(feature = "sandbox")]
147    pub fn sandbox_config(&self) -> Option<&adk_sandbox::workspace::SandboxConfig> {
148        self.sandbox_config.as_ref()
149    }
150
151    async fn apply_input_guardrails(
152        ctx: Arc<dyn InvocationContext>,
153        input_guardrails: Arc<GuardrailSet>,
154    ) -> Result<Arc<dyn InvocationContext>> {
155        let content =
156            enforce_guardrails(input_guardrails.as_ref(), ctx.user_content(), "input").await?;
157        if content.role != ctx.user_content().role || content.parts != ctx.user_content().parts {
158            Ok(with_user_content_override(ctx, content))
159        } else {
160            Ok(ctx)
161        }
162    }
163
164    async fn apply_output_guardrails(
165        output_guardrails: &GuardrailSet,
166        content: Content,
167    ) -> Result<Content> {
168        enforce_guardrails(output_guardrails, &content, "output").await
169    }
170
171    fn history_parts_from_provider_metadata(
172        provider_metadata: Option<&serde_json::Value>,
173    ) -> Vec<Part> {
174        let Some(provider_metadata) = provider_metadata else {
175            return Vec::new();
176        };
177
178        let history_parts = provider_metadata
179            .get("conversation_history_parts")
180            .or_else(|| {
181                provider_metadata
182                    .get("openai")
183                    .and_then(|openai| openai.get("conversation_history_parts"))
184            })
185            .and_then(serde_json::Value::as_array);
186
187        history_parts
188            .into_iter()
189            .flatten()
190            .filter_map(|value| serde_json::from_value::<Part>(value.clone()).ok())
191            .collect()
192    }
193
194    fn augment_content_for_history(
195        content: &Content,
196        provider_metadata: Option<&serde_json::Value>,
197    ) -> Content {
198        let mut augmented = content.clone();
199        augmented.parts.extend(Self::history_parts_from_provider_metadata(provider_metadata));
200        augmented
201    }
202}
203
204/// Validate a JSON string against an output schema.
205///
206/// Returns `Ok(valid_json)` if the text parses as valid JSON and passes schema
207/// validation. Returns `Err(error_message)` describing the validation failure.
208fn validate_output_against_schema(
209    text: &str,
210    schema: &serde_json::Value,
211) -> std::result::Result<serde_json::Value, String> {
212    let parsed: serde_json::Value =
213        serde_json::from_str(text).map_err(|e| format!("Response is not valid JSON: {e}"))?;
214
215    let validator =
216        jsonschema::validator_for(schema).map_err(|e| format!("Invalid schema: {e}"))?;
217
218    let errors: Vec<String> = validator.iter_errors(&parsed).map(|e| e.to_string()).collect();
219
220    if errors.is_empty() { Ok(parsed) } else { Err(errors.join("; ")) }
221}
222
223/// Extract the text content from a series of events.
224///
225/// Scans events in reverse order for the last non-empty text content
226/// produced by the agent. Used internally for output schema validation.
227fn extract_text_from_events(events: &[Event]) -> Option<String> {
228    for event in events.iter().rev() {
229        if let Some(ref content) = event.llm_response.content {
230            let text: String =
231                content
232                    .parts
233                    .iter()
234                    .filter_map(|p| {
235                        if let Part::Text { text } = p { Some(text.as_str()) } else { None }
236                    })
237                    .collect::<Vec<_>>()
238                    .join("");
239            if !text.is_empty() {
240                return Some(text);
241            }
242        }
243    }
244    None
245}
246
247/// Extract a typed value from agent events.
248///
249/// Scans events for the last text content and deserializes it into `T`.
250/// This is useful after running an agent with `output_schema` set to
251/// extract the structured result.
252///
253/// # Example
254///
255/// ```rust,ignore
256/// use serde::Deserialize;
257/// use adk_agent::extract_typed;
258///
259/// #[derive(Deserialize)]
260/// struct Weather {
261///     temperature: f64,
262///     condition: String,
263/// }
264///
265/// let events: Vec<Event> = collect_events_from_stream(stream).await?;
266/// let weather: Weather = extract_typed(&events)?;
267/// ```
268pub fn extract_typed<T: serde::de::DeserializeOwned>(events: &[Event]) -> Result<T> {
269    let text = extract_text_from_events(events).ok_or_else(|| {
270        adk_core::AdkError::agent("no text content found in events for typed extraction")
271    })?;
272
273    serde_json::from_str(&text)
274        .map_err(|e| adk_core::AdkError::agent(format!("output deserialization failed: {e}")))
275}
276
277/// Builder for constructing an [`LlmAgent`] with all configuration options.
278pub struct LlmAgentBuilder {
279    name: String,
280    description: Option<String>,
281    model: Option<Arc<dyn Llm>>,
282    instruction: Option<String>,
283    instruction_provider: Option<Arc<InstructionProvider>>,
284    global_instruction: Option<String>,
285    global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
286    skills_index: Option<Arc<SkillIndex>>,
287    skill_policy: SelectionPolicy,
288    max_skill_chars: usize,
289    input_schema: Option<serde_json::Value>,
290    output_schema: Option<serde_json::Value>,
291    output_max_retries: usize,
292    disallow_transfer_to_parent: bool,
293    disallow_transfer_to_peers: bool,
294    include_contents: adk_core::IncludeContents,
295    tools: Vec<Arc<dyn Tool>>,
296    toolsets: Vec<Arc<dyn Toolset>>,
297    sub_agents: Vec<Arc<dyn Agent>>,
298    output_key: Option<String>,
299    generate_content_config: Option<adk_core::GenerateContentConfig>,
300    max_iterations: u32,
301    tool_timeout: std::time::Duration,
302    before_callbacks: Vec<BeforeAgentCallback>,
303    after_callbacks: Vec<AfterAgentCallback>,
304    before_model_callbacks: Vec<BeforeModelCallback>,
305    after_model_callbacks: Vec<AfterModelCallback>,
306    before_tool_callbacks: Vec<BeforeToolCallback>,
307    after_tool_callbacks: Vec<AfterToolCallback>,
308    on_tool_error_callbacks: Vec<OnToolErrorCallback>,
309    after_tool_callbacks_full: Vec<AfterToolCallbackFull>,
310    default_retry_budget: Option<RetryBudget>,
311    tool_retry_budgets: std::collections::HashMap<String, RetryBudget>,
312    circuit_breaker_threshold: Option<u32>,
313    tool_confirmation_policy: ToolConfirmationPolicy,
314    tool_execution_strategy: Option<ToolExecutionStrategy>,
315    input_guardrails: GuardrailSet,
316    output_guardrails: GuardrailSet,
317    /// Enhanced plugins to register on the built agent.
318    #[cfg(feature = "enhanced-plugins")]
319    enhanced_plugins: Vec<Arc<dyn EnhancedPlugin>>,
320    /// Optional sandbox configuration for workspace lifecycle management.
321    #[cfg(feature = "sandbox")]
322    sandbox_config: Option<adk_sandbox::workspace::SandboxConfig>,
323}
324
325impl LlmAgentBuilder {
326    /// Create a new builder with the given agent name.
327    pub fn new(name: impl Into<String>) -> Self {
328        Self {
329            name: name.into(),
330            description: None,
331            model: None,
332            instruction: None,
333            instruction_provider: None,
334            global_instruction: None,
335            global_instruction_provider: None,
336            skills_index: None,
337            skill_policy: SelectionPolicy::default(),
338            max_skill_chars: 2000,
339            input_schema: None,
340            output_schema: None,
341            output_max_retries: 3,
342            disallow_transfer_to_parent: false,
343            disallow_transfer_to_peers: false,
344            include_contents: adk_core::IncludeContents::Default,
345            tools: Vec::new(),
346            toolsets: Vec::new(),
347            sub_agents: Vec::new(),
348            output_key: None,
349            generate_content_config: None,
350            max_iterations: DEFAULT_MAX_ITERATIONS,
351            tool_timeout: DEFAULT_TOOL_TIMEOUT,
352            before_callbacks: Vec::new(),
353            after_callbacks: Vec::new(),
354            before_model_callbacks: Vec::new(),
355            after_model_callbacks: Vec::new(),
356            before_tool_callbacks: Vec::new(),
357            after_tool_callbacks: Vec::new(),
358            on_tool_error_callbacks: Vec::new(),
359            after_tool_callbacks_full: Vec::new(),
360            default_retry_budget: None,
361            tool_retry_budgets: std::collections::HashMap::new(),
362            circuit_breaker_threshold: None,
363            tool_confirmation_policy: ToolConfirmationPolicy::Never,
364            tool_execution_strategy: None,
365            input_guardrails: GuardrailSet::new(),
366            output_guardrails: GuardrailSet::new(),
367            #[cfg(feature = "enhanced-plugins")]
368            enhanced_plugins: Vec::new(),
369            #[cfg(feature = "sandbox")]
370            sandbox_config: None,
371        }
372    }
373
374    /// Set the agent description.
375    pub fn description(mut self, desc: impl Into<String>) -> Self {
376        self.description = Some(desc.into());
377        self
378    }
379
380    /// Set the LLM model for this agent.
381    pub fn model(mut self, model: Arc<dyn Llm>) -> Self {
382        self.model = Some(model);
383        self
384    }
385
386    /// Set the system instruction for this agent.
387    pub fn instruction(mut self, instruction: impl Into<String>) -> Self {
388        self.instruction = Some(instruction.into());
389        self
390    }
391
392    /// Set a dynamic instruction provider evaluated per invocation.
393    pub fn instruction_provider(mut self, provider: InstructionProvider) -> Self {
394        self.instruction_provider = Some(Arc::new(provider));
395        self
396    }
397
398    /// Set a global instruction prepended to all requests.
399    pub fn global_instruction(mut self, instruction: impl Into<String>) -> Self {
400        self.global_instruction = Some(instruction.into());
401        self
402    }
403
404    /// Set a dynamic global instruction provider evaluated per invocation.
405    pub fn global_instruction_provider(mut self, provider: GlobalInstructionProvider) -> Self {
406        self.global_instruction_provider = Some(Arc::new(provider));
407        self
408    }
409
410    /// Set a preloaded skills index for this agent.
411    #[cfg(feature = "skills")]
412    pub fn with_skills(mut self, index: SkillIndex) -> Self {
413        self.skills_index = Some(Arc::new(index));
414        self
415    }
416
417    /// Auto-load skills from `.skills/` in the current working directory.
418    #[cfg(feature = "skills")]
419    pub fn with_auto_skills(self) -> Result<Self> {
420        self.with_skills_from_root(".")
421    }
422
423    /// Auto-load skills from `.skills/` under a custom root directory.
424    #[cfg(feature = "skills")]
425    pub fn with_skills_from_root(mut self, root: impl AsRef<std::path::Path>) -> Result<Self> {
426        let index = load_skill_index(root).map_err(|e| adk_core::AdkError::agent(e.to_string()))?;
427        self.skills_index = Some(Arc::new(index));
428        Ok(self)
429    }
430
431    /// Customize skill selection behavior.
432    #[cfg(feature = "skills")]
433    pub fn with_skill_policy(mut self, policy: SelectionPolicy) -> Self {
434        self.skill_policy = policy;
435        self
436    }
437
438    /// Limit injected skill content length.
439    #[cfg(feature = "skills")]
440    pub fn with_skill_budget(mut self, max_chars: usize) -> Self {
441        self.max_skill_chars = max_chars;
442        self
443    }
444
445    /// Set a JSON schema for validating user input.
446    pub fn input_schema(mut self, schema: serde_json::Value) -> Self {
447        self.input_schema = Some(schema);
448        self
449    }
450
451    /// Set a JSON schema for structured output from the LLM.
452    pub fn output_schema(mut self, schema: serde_json::Value) -> Self {
453        self.output_schema = Some(schema);
454        self
455    }
456
457    /// Derive the output schema from a Rust type using `schemars`.
458    ///
459    /// This is a convenience method that generates a JSON Schema from `T`'s
460    /// `JsonSchema` implementation and sets it as the output schema.
461    ///
462    /// # Example
463    ///
464    /// ```rust,ignore
465    /// use schemars::JsonSchema;
466    /// use serde::Deserialize;
467    ///
468    /// #[derive(JsonSchema, Deserialize)]
469    /// struct MyOutput {
470    ///     name: String,
471    ///     score: f64,
472    /// }
473    ///
474    /// let agent = LlmAgentBuilder::new("my-agent")
475    ///     .model(model)
476    ///     .output_type::<MyOutput>()
477    ///     .build()?;
478    /// ```
479    pub fn output_type<T: schemars::JsonSchema>(mut self) -> Self {
480        let schema = schemars::schema_for!(T);
481        self.output_schema =
482            Some(serde_json::to_value(schema).expect("schema serialization cannot fail"));
483        self
484    }
485
486    /// Set the maximum number of retry attempts for output schema validation.
487    ///
488    /// When the LLM produces output that fails schema validation, the agent
489    /// will retry up to this many times with a correction prompt. Default is 3.
490    pub fn output_max_retries(mut self, n: usize) -> Self {
491        self.output_max_retries = n;
492        self
493    }
494
495    /// Prevent this agent from transferring control back to its parent.
496    pub fn disallow_transfer_to_parent(mut self, disallow: bool) -> Self {
497        self.disallow_transfer_to_parent = disallow;
498        self
499    }
500
501    /// Prevent this agent from transferring control to peer agents.
502    pub fn disallow_transfer_to_peers(mut self, disallow: bool) -> Self {
503        self.disallow_transfer_to_peers = disallow;
504        self
505    }
506
507    /// Control which conversation history contents are included in LLM requests.
508    pub fn include_contents(mut self, include: adk_core::IncludeContents) -> Self {
509        self.include_contents = include;
510        self
511    }
512
513    /// Set a state key where the agent's final output will be stored.
514    pub fn output_key(mut self, key: impl Into<String>) -> Self {
515        self.output_key = Some(key.into());
516        self
517    }
518
519    /// Set default generation parameters (temperature, top_p, top_k, max_output_tokens)
520    /// applied to every LLM request made by this agent.
521    ///
522    /// These defaults are merged with any per-request config. If `output_schema` is also
523    /// set, the schema is preserved alongside these generation parameters.
524    ///
525    /// # Example
526    ///
527    /// ```rust,ignore
528    /// use adk_core::GenerateContentConfig;
529    ///
530    /// let agent = LlmAgentBuilder::new("my-agent")
531    ///     .model(model)
532    ///     .generate_content_config(GenerateContentConfig {
533    ///         temperature: Some(0.7),
534    ///         max_output_tokens: Some(2048),
535    ///         ..Default::default()
536    ///     })
537    ///     .build()?;
538    /// ```
539    pub fn generate_content_config(mut self, config: adk_core::GenerateContentConfig) -> Self {
540        self.generate_content_config = Some(config);
541        self
542    }
543
544    /// Set the default temperature for LLM requests.
545    /// Shorthand for setting just temperature without a full `GenerateContentConfig`.
546    pub fn temperature(mut self, temperature: f32) -> Self {
547        self.generate_content_config
548            .get_or_insert(adk_core::GenerateContentConfig::default())
549            .temperature = Some(temperature);
550        self
551    }
552
553    /// Set the default top_p for LLM requests.
554    pub fn top_p(mut self, top_p: f32) -> Self {
555        self.generate_content_config
556            .get_or_insert(adk_core::GenerateContentConfig::default())
557            .top_p = Some(top_p);
558        self
559    }
560
561    /// Set the default top_k for LLM requests.
562    pub fn top_k(mut self, top_k: i32) -> Self {
563        self.generate_content_config
564            .get_or_insert(adk_core::GenerateContentConfig::default())
565            .top_k = Some(top_k);
566        self
567    }
568
569    /// Set the default max output tokens for LLM requests.
570    pub fn max_output_tokens(mut self, max_tokens: i32) -> Self {
571        self.generate_content_config
572            .get_or_insert(adk_core::GenerateContentConfig::default())
573            .max_output_tokens = Some(max_tokens);
574        self
575    }
576
577    /// Set the maximum number of LLM round-trips (iterations) before the agent stops.
578    /// Default is 100.
579    pub fn max_iterations(mut self, max: u32) -> Self {
580        self.max_iterations = max;
581        self
582    }
583
584    /// Set the timeout for individual tool executions.
585    /// Default is 5 minutes. Tools that exceed this timeout will return an error.
586    pub fn tool_timeout(mut self, timeout: std::time::Duration) -> Self {
587        self.tool_timeout = timeout;
588        self
589    }
590
591    /// Add a tool to this agent's toolbox.
592    pub fn tool(mut self, tool: Arc<dyn Tool>) -> Self {
593        self.tools.push(tool);
594        self
595    }
596
597    /// Register a dynamic toolset for per-invocation tool resolution.
598    ///
599    /// Toolsets are resolved at the start of each `run()` call using the
600    /// invocation's `ReadonlyContext`. This enables context-dependent tools
601    /// like per-user browser sessions from a pool.
602    pub fn toolset(mut self, toolset: Arc<dyn Toolset>) -> Self {
603        self.toolsets.push(toolset);
604        self
605    }
606
607    /// Add a sub-agent that this agent can delegate to.
608    pub fn sub_agent(mut self, agent: Arc<dyn Agent>) -> Self {
609        self.sub_agents.push(agent);
610        self
611    }
612
613    /// Add a before-agent callback.
614    pub fn before_callback(mut self, callback: BeforeAgentCallback) -> Self {
615        self.before_callbacks.push(callback);
616        self
617    }
618
619    /// Add an after-agent callback.
620    pub fn after_callback(mut self, callback: AfterAgentCallback) -> Self {
621        self.after_callbacks.push(callback);
622        self
623    }
624
625    /// Add a before-model callback invoked before each LLM request.
626    pub fn before_model_callback(mut self, callback: BeforeModelCallback) -> Self {
627        self.before_model_callbacks.push(callback);
628        self
629    }
630
631    /// Add an after-model callback invoked after each LLM response.
632    pub fn after_model_callback(mut self, callback: AfterModelCallback) -> Self {
633        self.after_model_callbacks.push(callback);
634        self
635    }
636
637    /// Add a before-tool callback invoked before each tool execution.
638    pub fn before_tool_callback(mut self, callback: BeforeToolCallback) -> Self {
639        self.before_tool_callbacks.push(callback);
640        self
641    }
642
643    /// Add an after-tool callback invoked after each tool execution.
644    pub fn after_tool_callback(mut self, callback: AfterToolCallback) -> Self {
645        self.after_tool_callbacks.push(callback);
646        self
647    }
648
649    /// Register a rich after-tool callback that receives the tool, arguments,
650    /// and response value.
651    ///
652    /// This is the V2 callback surface aligned with the Python/Go ADK model
653    /// where `after_tool_callback` receives the full tool execution context.
654    /// Unlike [`after_tool_callback`](Self::after_tool_callback) (which only
655    /// receives `CallbackContext`), this callback can inspect and modify tool
656    /// results directly.
657    ///
658    /// Return `Ok(None)` to keep the original response, or `Ok(Some(value))`
659    /// to replace the function response sent to the LLM.
660    ///
661    /// These callbacks run after the legacy `after_tool_callback` chain.
662    /// `ToolOutcome` is available via `ctx.tool_outcome()`.
663    pub fn after_tool_callback_full(mut self, callback: AfterToolCallbackFull) -> Self {
664        self.after_tool_callbacks_full.push(callback);
665        self
666    }
667
668    /// Register a callback invoked when a tool execution fails
669    /// (after retries are exhausted).
670    ///
671    /// If the callback returns `Ok(Some(value))`, the value is used as a
672    /// fallback function response to the LLM. If it returns `Ok(None)`,
673    /// the next callback in the chain is tried. If no callback provides a
674    /// fallback, the original error is reported to the LLM.
675    pub fn on_tool_error(mut self, callback: OnToolErrorCallback) -> Self {
676        self.on_tool_error_callbacks.push(callback);
677        self
678    }
679
680    /// Set a default retry budget applied to all tools that do not have
681    /// a per-tool override.
682    ///
683    /// When a tool execution fails and a retry budget applies, the agent
684    /// retries up to `budget.max_retries` times with the configured delay
685    /// between attempts.
686    pub fn default_retry_budget(mut self, budget: RetryBudget) -> Self {
687        self.default_retry_budget = Some(budget);
688        self
689    }
690
691    /// Set a per-tool retry budget that overrides the default for the
692    /// named tool.
693    ///
694    /// Per-tool budgets take precedence over the default retry budget.
695    pub fn tool_retry_budget(mut self, tool_name: impl Into<String>, budget: RetryBudget) -> Self {
696        self.tool_retry_budgets.insert(tool_name.into(), budget);
697        self
698    }
699
700    /// Configure a circuit breaker that temporarily disables tools after
701    /// `threshold` consecutive failures within a single invocation.
702    ///
703    /// When a tool's consecutive failure count reaches the threshold, subsequent
704    /// calls to that tool are short-circuited with an immediate error response
705    /// until the next invocation (which resets the state).
706    pub fn circuit_breaker_threshold(mut self, threshold: u32) -> Self {
707        self.circuit_breaker_threshold = Some(threshold);
708        self
709    }
710
711    /// Configure tool confirmation requirements for this agent.
712    pub fn tool_confirmation_policy(mut self, policy: ToolConfirmationPolicy) -> Self {
713        self.tool_confirmation_policy = policy;
714        self
715    }
716
717    /// Require confirmation for a specific tool name.
718    pub fn require_tool_confirmation(mut self, tool_name: impl Into<String>) -> Self {
719        self.tool_confirmation_policy = self.tool_confirmation_policy.with_tool(tool_name);
720        self
721    }
722
723    /// Require confirmation for all tool calls.
724    pub fn require_tool_confirmation_for_all(mut self) -> Self {
725        self.tool_confirmation_policy = ToolConfirmationPolicy::Always;
726        self
727    }
728
729    /// Set the tool execution strategy for this agent.
730    ///
731    /// When set, this overrides the `RunConfig`'s `tool_execution_strategy`
732    /// for this agent's dispatch loop. When `None` (the default), the
733    /// `RunConfig` value is used.
734    pub fn tool_execution_strategy(mut self, strategy: ToolExecutionStrategy) -> Self {
735        self.tool_execution_strategy = Some(strategy);
736        self
737    }
738
739    /// Set input guardrails to validate user input before processing.
740    ///
741    /// Input guardrails run before the agent processes the request and can:
742    /// - Block harmful or off-topic content
743    /// - Redact PII from user input
744    /// - Enforce input length limits
745    ///
746    /// Requires the `guardrails` feature.
747    pub fn input_guardrails(mut self, guardrails: GuardrailSet) -> Self {
748        self.input_guardrails = guardrails;
749        self
750    }
751
752    /// Set output guardrails to validate agent responses.
753    ///
754    /// Output guardrails run after the agent generates a response and can:
755    /// - Enforce JSON schema compliance
756    /// - Redact PII from responses
757    /// - Block harmful content in responses
758    ///
759    /// Requires the `guardrails` feature.
760    pub fn output_guardrails(mut self, guardrails: GuardrailSet) -> Self {
761        self.output_guardrails = guardrails;
762        self
763    }
764
765    /// Register a single enhanced plugin for fine-grained tool/model call interception.
766    ///
767    /// Enhanced plugins can inspect and modify tool arguments, tool results,
768    /// model requests, and model responses. They execute in priority order
769    /// (lower priority values execute first).
770    ///
771    /// Requires the `enhanced-plugins` feature.
772    ///
773    /// # Example
774    ///
775    /// ```rust,ignore
776    /// use std::sync::Arc;
777    /// use adk_plugin::EnhancedPlugin;
778    ///
779    /// let agent = LlmAgentBuilder::new("my-agent")
780    ///     .model(model)
781    ///     .enhanced_plugin(Arc::new(MyPlugin::new()))
782    ///     .build()?;
783    /// ```
784    #[cfg(feature = "enhanced-plugins")]
785    pub fn enhanced_plugin(mut self, plugin: Arc<dyn EnhancedPlugin>) -> Self {
786        self.enhanced_plugins.push(plugin);
787        self
788    }
789
790    /// Register multiple enhanced plugins at once.
791    ///
792    /// Plugins are sorted by priority when the agent is built. Lower priority
793    /// values execute first. Same-priority plugins execute in registration order.
794    ///
795    /// Requires the `enhanced-plugins` feature.
796    ///
797    /// # Example
798    ///
799    /// ```rust,ignore
800    /// use std::sync::Arc;
801    /// use adk_plugin::EnhancedPlugin;
802    ///
803    /// let agent = LlmAgentBuilder::new("my-agent")
804    ///     .model(model)
805    ///     .enhanced_plugins(vec![
806    ///         Arc::new(SecurityPlugin::new()),  // priority = 10
807    ///         Arc::new(LoggingPlugin::new()),   // priority = 100
808    ///     ])
809    ///     .build()?;
810    /// ```
811    #[cfg(feature = "enhanced-plugins")]
812    pub fn enhanced_plugins(mut self, plugins: Vec<Arc<dyn EnhancedPlugin>>) -> Self {
813        self.enhanced_plugins.extend(plugins);
814        self
815    }
816
817    /// Attach a sandbox configuration for workspace lifecycle management.
818    ///
819    /// When a `SandboxConfig` is attached, the `SandboxRunner` will provision
820    /// a workspace, bind tools based on enabled capabilities, and manage the
821    /// session lifecycle. The config does NOT add tools directly to the agent —
822    /// tool binding is the responsibility of the `SandboxRunner`.
823    ///
824    /// When no `SandboxConfig` is attached, the agent behaves identically to
825    /// its behavior before this feature was introduced.
826    ///
827    /// Requires the `sandbox` feature.
828    ///
829    /// # Example
830    ///
831    /// ```rust,ignore
832    /// use adk_sandbox::workspace::{SandboxConfig, Capability, Manifest};
833    /// use std::collections::HashSet;
834    /// use std::sync::Arc;
835    /// use std::time::Duration;
836    ///
837    /// let config = SandboxConfig {
838    ///     client: Arc::new(my_client),
839    ///     manifest: Manifest { entries: vec![] },
840    ///     capabilities: HashSet::from([Capability::Shell, Capability::Filesystem]),
841    ///     snapshot_on_stop: true,
842    ///     session_timeout: Duration::from_secs(600),
843    ///     command_timeout: Duration::from_secs(120),
844    /// };
845    ///
846    /// let agent = LlmAgentBuilder::new("coding-agent")
847    ///     .model(model)
848    ///     .sandbox_config(config)
849    ///     .build()?;
850    /// ```
851    #[cfg(feature = "sandbox")]
852    pub fn sandbox_config(mut self, config: adk_sandbox::workspace::SandboxConfig) -> Self {
853        self.sandbox_config = Some(config);
854        self
855    }
856
857    /// Build the [`LlmAgent`], returning an error if no model was set.
858    pub fn build(self) -> Result<LlmAgent> {
859        let model = self.model.ok_or_else(|| adk_core::AdkError::agent("Model is required"))?;
860
861        let mut seen_names = std::collections::HashSet::new();
862        for agent in &self.sub_agents {
863            if !seen_names.insert(agent.name()) {
864                return Err(adk_core::AdkError::agent(format!(
865                    "Duplicate sub-agent name: {}",
866                    agent.name()
867                )));
868            }
869        }
870
871        // Validate: Gemini Interactions API + client-side sandbox tools conflict.
872        // These provide competing filesystems and would produce nondeterministic behavior.
873        #[cfg(feature = "sandbox")]
874        if let Some(ref sandbox_cfg) = self.sandbox_config {
875            use adk_sandbox::workspace::Capability;
876            if model.uses_interactions_api()
877                && (sandbox_cfg.capabilities.contains(&Capability::Shell)
878                    || sandbox_cfg.capabilities.contains(&Capability::Filesystem))
879            {
880                return Err(adk_core::AdkError::new(
881                    adk_core::ErrorComponent::Agent,
882                    adk_core::ErrorCategory::InvalidInput,
883                    "code.gemini_interactions_conflict",
884                    "Cannot combine Gemini Interactions API (server-managed environment) \
885                     with client-side sandbox tools (Shell/Filesystem). These provide \
886                     competing filesystems and would produce nondeterministic behavior. \
887                     Either disable use_interactions_api or remove sandbox capabilities.",
888                ));
889            }
890        }
891
892        // Construct EnhancedPluginManager only when plugins are registered (zero overhead otherwise)
893        #[cfg(feature = "enhanced-plugins")]
894        let enhanced_plugin_manager = if self.enhanced_plugins.is_empty() {
895            None
896        } else {
897            Some(Arc::new(EnhancedPluginManager::new(self.enhanced_plugins)))
898        };
899
900        Ok(LlmAgent {
901            name: self.name,
902            description: self.description.unwrap_or_default(),
903            model,
904            instruction: self.instruction,
905            instruction_provider: self.instruction_provider,
906            global_instruction: self.global_instruction,
907            global_instruction_provider: self.global_instruction_provider,
908            skills_index: self.skills_index,
909            skill_policy: self.skill_policy,
910            max_skill_chars: self.max_skill_chars,
911            input_schema: self.input_schema,
912            output_schema: self.output_schema,
913            output_max_retries: self.output_max_retries,
914            disallow_transfer_to_parent: self.disallow_transfer_to_parent,
915            disallow_transfer_to_peers: self.disallow_transfer_to_peers,
916            include_contents: self.include_contents,
917            tools: self.tools,
918            toolsets: self.toolsets,
919            sub_agents: self.sub_agents,
920            output_key: self.output_key,
921            generate_content_config: self.generate_content_config,
922            max_iterations: self.max_iterations,
923            tool_timeout: self.tool_timeout,
924            before_callbacks: Arc::new(self.before_callbacks),
925            after_callbacks: Arc::new(self.after_callbacks),
926            before_model_callbacks: Arc::new(self.before_model_callbacks),
927            after_model_callbacks: Arc::new(self.after_model_callbacks),
928            before_tool_callbacks: Arc::new(self.before_tool_callbacks),
929            after_tool_callbacks: Arc::new(self.after_tool_callbacks),
930            on_tool_error_callbacks: Arc::new(self.on_tool_error_callbacks),
931            after_tool_callbacks_full: Arc::new(self.after_tool_callbacks_full),
932            default_retry_budget: self.default_retry_budget,
933            tool_retry_budgets: self.tool_retry_budgets,
934            circuit_breaker_threshold: self.circuit_breaker_threshold,
935            tool_confirmation_policy: self.tool_confirmation_policy,
936            tool_execution_strategy: self.tool_execution_strategy,
937            input_guardrails: Arc::new(self.input_guardrails),
938            output_guardrails: Arc::new(self.output_guardrails),
939            #[cfg(feature = "enhanced-plugins")]
940            enhanced_plugin_manager,
941            #[cfg(feature = "sandbox")]
942            sandbox_config: self.sandbox_config,
943        })
944    }
945}
946
947// AgentToolContext wraps the parent InvocationContext and preserves all context
948// instead of throwing it away like SimpleToolContext did
949struct AgentToolContext {
950    parent_ctx: Arc<dyn InvocationContext>,
951    function_call_id: String,
952    actions: Mutex<EventActions>,
953}
954
955impl AgentToolContext {
956    fn new(parent_ctx: Arc<dyn InvocationContext>, function_call_id: String) -> Self {
957        Self { parent_ctx, function_call_id, actions: Mutex::new(EventActions::default()) }
958    }
959
960    fn actions_guard(&self) -> std::sync::MutexGuard<'_, EventActions> {
961        self.actions.lock().unwrap_or_else(|e| e.into_inner())
962    }
963}
964
965#[async_trait]
966impl ReadonlyContext for AgentToolContext {
967    fn invocation_id(&self) -> &str {
968        self.parent_ctx.invocation_id()
969    }
970
971    fn agent_name(&self) -> &str {
972        self.parent_ctx.agent_name()
973    }
974
975    fn user_id(&self) -> &str {
976        // ✅ Delegate to parent - now tools get the real user_id!
977        self.parent_ctx.user_id()
978    }
979
980    fn app_name(&self) -> &str {
981        // ✅ Delegate to parent - now tools get the real app_name!
982        self.parent_ctx.app_name()
983    }
984
985    fn session_id(&self) -> &str {
986        // ✅ Delegate to parent - now tools get the real session_id!
987        self.parent_ctx.session_id()
988    }
989
990    fn branch(&self) -> &str {
991        self.parent_ctx.branch()
992    }
993
994    fn user_content(&self) -> &Content {
995        self.parent_ctx.user_content()
996    }
997}
998
999#[async_trait]
1000impl CallbackContext for AgentToolContext {
1001    fn artifacts(&self) -> Option<Arc<dyn adk_core::Artifacts>> {
1002        // ✅ Delegate to parent - tools can now access artifacts!
1003        self.parent_ctx.artifacts()
1004    }
1005
1006    fn shared_state(&self) -> Option<Arc<adk_core::SharedState>> {
1007        self.parent_ctx.shared_state()
1008    }
1009}
1010
1011#[async_trait]
1012impl ToolContext for AgentToolContext {
1013    fn function_call_id(&self) -> &str {
1014        &self.function_call_id
1015    }
1016
1017    fn actions(&self) -> EventActions {
1018        self.actions_guard().clone()
1019    }
1020
1021    fn set_actions(&self, actions: EventActions) {
1022        *self.actions_guard() = actions;
1023    }
1024
1025    async fn search_memory(&self, query: &str) -> Result<Vec<MemoryEntry>> {
1026        // ✅ Delegate to parent's memory if available
1027        if let Some(memory) = self.parent_ctx.memory() {
1028            memory.search(query).await
1029        } else {
1030            Ok(vec![])
1031        }
1032    }
1033
1034    fn user_scopes(&self) -> Vec<String> {
1035        self.parent_ctx.user_scopes()
1036    }
1037
1038    async fn get_secret(&self, name: &str) -> Result<Option<String>> {
1039        self.parent_ctx.get_secret(name).await
1040    }
1041}
1042
1043/// Wrapper that adds ToolOutcome to an existing CallbackContext.
1044/// Used only during after-tool callback invocation so callbacks
1045/// can inspect structured metadata about the completed tool execution.
1046struct ToolOutcomeCallbackContext {
1047    inner: Arc<dyn CallbackContext>,
1048    outcome: ToolOutcome,
1049}
1050
1051#[async_trait]
1052impl ReadonlyContext for ToolOutcomeCallbackContext {
1053    fn invocation_id(&self) -> &str {
1054        self.inner.invocation_id()
1055    }
1056
1057    fn agent_name(&self) -> &str {
1058        self.inner.agent_name()
1059    }
1060
1061    fn user_id(&self) -> &str {
1062        self.inner.user_id()
1063    }
1064
1065    fn app_name(&self) -> &str {
1066        self.inner.app_name()
1067    }
1068
1069    fn session_id(&self) -> &str {
1070        self.inner.session_id()
1071    }
1072
1073    fn branch(&self) -> &str {
1074        self.inner.branch()
1075    }
1076
1077    fn user_content(&self) -> &Content {
1078        self.inner.user_content()
1079    }
1080}
1081
1082#[async_trait]
1083impl CallbackContext for ToolOutcomeCallbackContext {
1084    fn artifacts(&self) -> Option<Arc<dyn adk_core::Artifacts>> {
1085        self.inner.artifacts()
1086    }
1087
1088    fn tool_outcome(&self) -> Option<ToolOutcome> {
1089        Some(self.outcome.clone())
1090    }
1091}
1092
1093/// Per-invocation circuit breaker state.
1094///
1095/// Tracks consecutive failures per tool name within a single agent
1096/// invocation. When a tool's consecutive failure count reaches the
1097/// configured threshold the breaker "opens" and subsequent calls to
1098/// that tool are short-circuited with an immediate error response.
1099///
1100/// The state is created fresh at the start of each `run()` call so
1101/// it automatically resets between invocations.
1102struct CircuitBreakerState {
1103    threshold: u32,
1104    /// tool_name → consecutive failure count
1105    failures: std::collections::HashMap<String, u32>,
1106}
1107
1108impl CircuitBreakerState {
1109    fn new(threshold: u32) -> Self {
1110        Self { threshold, failures: std::collections::HashMap::new() }
1111    }
1112
1113    /// Returns `true` if the tool is currently tripped (open state).
1114    fn is_open(&self, tool_name: &str) -> bool {
1115        self.failures.get(tool_name).copied().unwrap_or(0) >= self.threshold
1116    }
1117
1118    /// Record a tool outcome. Resets count on success, increments on failure.
1119    fn record(&mut self, outcome: &ToolOutcome) {
1120        if outcome.success {
1121            self.failures.remove(&outcome.tool_name);
1122        } else {
1123            let count = self.failures.entry(outcome.tool_name.clone()).or_insert(0);
1124            *count += 1;
1125        }
1126    }
1127}
1128
1129#[async_trait]
1130impl Agent for LlmAgent {
1131    fn name(&self) -> &str {
1132        &self.name
1133    }
1134
1135    fn description(&self) -> &str {
1136        &self.description
1137    }
1138
1139    fn sub_agents(&self) -> &[Arc<dyn Agent>] {
1140        &self.sub_agents
1141    }
1142
1143    #[adk_telemetry::instrument(
1144        skip(self, ctx),
1145        fields(
1146            agent.name = %self.name,
1147            agent.description = %self.description,
1148            invocation.id = %ctx.invocation_id(),
1149            user.id = %ctx.user_id(),
1150            session.id = %ctx.session_id()
1151        )
1152    )]
1153    async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<adk_core::EventStream> {
1154        adk_telemetry::info!("Starting agent execution");
1155        let ctx = Self::apply_input_guardrails(ctx, self.input_guardrails.clone()).await?;
1156
1157        let agent_name = self.name.clone();
1158        let invocation_id = ctx.invocation_id().to_string();
1159        let model = self.model.clone();
1160        let tools = self.tools.clone();
1161        let toolsets = self.toolsets.clone();
1162        let sub_agents = self.sub_agents.clone();
1163
1164        let instruction = self.instruction.clone();
1165        let instruction_provider = self.instruction_provider.clone();
1166        let global_instruction = self.global_instruction.clone();
1167        let global_instruction_provider = self.global_instruction_provider.clone();
1168        let skills_index = self.skills_index.clone();
1169        let skill_policy = self.skill_policy.clone();
1170        let max_skill_chars = self.max_skill_chars;
1171        let output_key = self.output_key.clone();
1172        let output_schema = self.output_schema.clone();
1173        let output_max_retries = self.output_max_retries;
1174        let generate_content_config = self.generate_content_config.clone();
1175        let include_contents = self.include_contents;
1176        let max_iterations = self.max_iterations;
1177        let tool_timeout = self.tool_timeout;
1178        // Clone Arc references (cheap)
1179        let before_agent_callbacks = self.before_callbacks.clone();
1180        let after_agent_callbacks = self.after_callbacks.clone();
1181        let before_model_callbacks = self.before_model_callbacks.clone();
1182        let after_model_callbacks = self.after_model_callbacks.clone();
1183        let before_tool_callbacks = self.before_tool_callbacks.clone();
1184        let after_tool_callbacks = self.after_tool_callbacks.clone();
1185        let on_tool_error_callbacks = self.on_tool_error_callbacks.clone();
1186        let after_tool_callbacks_full = self.after_tool_callbacks_full.clone();
1187        let default_retry_budget = self.default_retry_budget.clone();
1188        let tool_retry_budgets = self.tool_retry_budgets.clone();
1189        let circuit_breaker_threshold = self.circuit_breaker_threshold;
1190        let tool_confirmation_policy = self.tool_confirmation_policy.clone();
1191        let disallow_transfer_to_parent = self.disallow_transfer_to_parent;
1192        let disallow_transfer_to_peers = self.disallow_transfer_to_peers;
1193        let output_guardrails = self.output_guardrails.clone();
1194        let agent_tool_execution_strategy = self.tool_execution_strategy;
1195        #[cfg(feature = "enhanced-plugins")]
1196        let enhanced_plugin_manager = self.enhanced_plugin_manager.clone();
1197
1198        let s = stream! {
1199            // ===== BEFORE AGENT CALLBACKS =====
1200            // Execute before the agent starts running
1201            // If any returns content, skip agent execution
1202            for callback in before_agent_callbacks.as_ref() {
1203                match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
1204                    Ok(Some(content)) => {
1205                        // Callback returned content - yield it and skip agent execution
1206                        let mut early_event = Event::new(&invocation_id);
1207                        early_event.author = agent_name.clone();
1208                        early_event.llm_response.content = Some(content);
1209                        yield Ok(early_event);
1210
1211                        // Skip rest of agent execution and go to after callbacks
1212                        for after_callback in after_agent_callbacks.as_ref() {
1213                            match after_callback(ctx.clone() as Arc<dyn CallbackContext>).await {
1214                                Ok(Some(after_content)) => {
1215                                    let mut after_event = Event::new(&invocation_id);
1216                                    after_event.author = agent_name.clone();
1217                                    after_event.llm_response.content = Some(after_content);
1218                                    yield Ok(after_event);
1219                                    return;
1220                                }
1221                                Ok(None) => continue,
1222                                Err(e) => {
1223                                    yield Err(e);
1224                                    return;
1225                                }
1226                            }
1227                        }
1228                        return;
1229                    }
1230                    Ok(None) => {
1231                        // Continue to next callback
1232                        continue;
1233                    }
1234                    Err(e) => {
1235                        // Callback failed - propagate error
1236                        yield Err(e);
1237                        return;
1238                    }
1239                }
1240            }
1241
1242            // ===== MAIN AGENT EXECUTION =====
1243            let mut prompt_preamble = Vec::new();
1244
1245            // ===== PROCESS SKILL CONTEXT =====
1246            // If skills are configured, select the most relevant skill from user input
1247            // and inject it as a compact instruction block before other prompts.
1248            if let Some(index) = &skills_index {
1249                let user_query = ctx
1250                    .user_content()
1251                    .parts
1252                    .iter()
1253                    .filter_map(|part| match part {
1254                        Part::Text { text } => Some(text.as_str()),
1255                        _ => None,
1256                    })
1257                    .collect::<Vec<_>>()
1258                    .join("\n");
1259
1260                if let Some((_matched, skill_block)) = select_skill_prompt_block(
1261                    index.as_ref(),
1262                    &user_query,
1263                    &skill_policy,
1264                    max_skill_chars,
1265                ) {
1266                    prompt_preamble.push(Content {
1267                        role: "user".to_string(),
1268                        parts: vec![Part::Text { text: skill_block }],
1269                    });
1270                }
1271            }
1272
1273            // ===== PROCESS GLOBAL INSTRUCTION =====
1274            // GlobalInstruction provides tree-wide personality/identity
1275            if let Some(provider) = &global_instruction_provider {
1276                // Dynamic global instruction via provider
1277                let global_inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
1278                if !global_inst.is_empty() {
1279                    prompt_preamble.push(Content {
1280                        role: "user".to_string(),
1281                        parts: vec![Part::Text { text: global_inst }],
1282                    });
1283                }
1284            } else if let Some(ref template) = global_instruction {
1285                // Static global instruction with template injection
1286                let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
1287                if !processed.is_empty() {
1288                    prompt_preamble.push(Content {
1289                        role: "user".to_string(),
1290                        parts: vec![Part::Text { text: processed }],
1291                    });
1292                }
1293            }
1294
1295            // ===== PROCESS AGENT INSTRUCTION =====
1296            // Agent-specific instruction
1297            if let Some(provider) = &instruction_provider {
1298                // Dynamic instruction via provider
1299                let inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
1300                if !inst.is_empty() {
1301                    prompt_preamble.push(Content {
1302                        role: "user".to_string(),
1303                        parts: vec![Part::Text { text: inst }],
1304                    });
1305                }
1306            } else if let Some(ref template) = instruction {
1307                // Static instruction with template injection
1308                let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
1309                if !processed.is_empty() {
1310                    prompt_preamble.push(Content {
1311                        role: "user".to_string(),
1312                        parts: vec![Part::Text { text: processed }],
1313                    });
1314                }
1315            }
1316
1317            // ===== OUTPUT SCHEMA INSTRUCTION INJECTION =====
1318            // When output_schema is set, append a directive instructing the LLM
1319            // to respond with valid JSON conforming to the schema.
1320            if let Some(ref schema) = output_schema {
1321                let schema_instruction = format!(
1322                    "You MUST respond with valid JSON conforming to this schema: {}. Do not include any text outside the JSON object.",
1323                    schema
1324                );
1325                prompt_preamble.push(Content {
1326                    role: "user".to_string(),
1327                    parts: vec![Part::Text { text: schema_instruction }],
1328                });
1329            }
1330
1331            // ===== LOAD SESSION HISTORY =====
1332            // Load previous conversation turns from the session
1333            // NOTE: Session history already includes the current user message (added by Runner before agent runs)
1334            // When transfer_targets is set, this agent was invoked via transfer — filter out
1335            // other agents' events so the LLM doesn't see the parent's tool calls as its own.
1336            let session_history = if !ctx.run_config().transfer_targets.is_empty() {
1337                ctx.session().conversation_history_for_agent(&agent_name)
1338            } else {
1339                ctx.session().conversation_history()
1340            };
1341            let mut session_history = session_history;
1342            let current_user_content = ctx.user_content().clone();
1343            if let Some(index) = session_history.iter().rposition(|content| content.role == "user") {
1344                session_history[index] = current_user_content.clone();
1345            } else {
1346                session_history.push(current_user_content.clone());
1347            }
1348
1349            // ===== APPLY INCLUDE_CONTENTS FILTERING =====
1350            // Control what conversation history the agent sees
1351            let mut conversation_history = match include_contents {
1352                adk_core::IncludeContents::None => {
1353                    let mut filtered = prompt_preamble.clone();
1354                    filtered.push(current_user_content);
1355                    filtered
1356                }
1357                adk_core::IncludeContents::Default => {
1358                    let mut full_history = prompt_preamble;
1359                    full_history.extend(session_history);
1360                    full_history
1361                }
1362            };
1363
1364            // ===== RESOLVE TOOLSETS =====
1365            // Start with static tools, then merge in toolset-provided tools
1366            let mut resolved_tools: Vec<Arc<dyn Tool>> = tools.clone();
1367            let static_tool_names: std::collections::HashSet<String> =
1368                tools.iter().map(|t| t.name().to_string()).collect();
1369
1370            // Track which toolset provided each tool for deterministic error messages
1371            let mut toolset_source: std::collections::HashMap<String, String> =
1372                std::collections::HashMap::new();
1373
1374            for toolset in &toolsets {
1375                let toolset_tools = match toolset
1376                    .tools(ctx.clone() as Arc<dyn ReadonlyContext>)
1377                    .await
1378                {
1379                    Ok(t) => t,
1380                    Err(e) => {
1381                        yield Err(e);
1382                        return;
1383                    }
1384                };
1385                for tool in &toolset_tools {
1386                    let name = tool.name().to_string();
1387                    // Check static-vs-toolset conflict
1388                    if static_tool_names.contains(&name) {
1389                        yield Err(adk_core::AdkError::agent(format!(
1390                            "Duplicate tool name '{name}': conflict between static tool and toolset '{}'",
1391                            toolset.name()
1392                        )));
1393                        return;
1394                    }
1395                    // Check toolset-vs-toolset conflict
1396                    if let Some(other_toolset_name) = toolset_source.get(&name) {
1397                        yield Err(adk_core::AdkError::agent(format!(
1398                            "Duplicate tool name '{name}': conflict between toolset '{}' and toolset '{}'",
1399                            other_toolset_name,
1400                            toolset.name()
1401                        )));
1402                        return;
1403                    }
1404                    toolset_source.insert(name, toolset.name().to_string());
1405                    resolved_tools.push(tool.clone());
1406                }
1407            }
1408
1409            // Build tool lookup map for O(1) access from merged resolved_tools
1410            let tool_map: std::collections::HashMap<String, Arc<dyn Tool>> = resolved_tools
1411                .iter()
1412                .map(|t| (t.name().to_string(), t.clone()))
1413                .collect();
1414
1415            // Helper: extract long-running tool IDs from content
1416            let collect_long_running_ids = |content: &Content| -> Vec<String> {
1417                content.parts.iter()
1418                    .filter_map(|p| {
1419                        if let Part::FunctionCall { name, .. } = p
1420                            && let Some(tool) = tool_map.get(name)
1421                            && tool.is_long_running()
1422                        {
1423                            return Some(name.clone());
1424                        }
1425                        None
1426                    })
1427                    .collect()
1428            };
1429
1430            // Build tool declarations for Gemini
1431            // Uses Tool::declaration() so provider-native built-ins can attach
1432            // adapter-specific metadata while regular function tools retain the
1433            // standard name/description/schema shape.
1434            let mut tool_declarations = std::collections::HashMap::new();
1435            for tool in &resolved_tools {
1436                tool_declarations.insert(tool.name().to_string(), tool.declaration());
1437            }
1438
1439            // Build the list of valid transfer targets.
1440            // Sources: sub_agents (always) + transfer_targets from RunConfig
1441            // (set by the runner to include parent/peers for transferred agents).
1442            // Apply disallow_transfer_to_parent / disallow_transfer_to_peers filtering.
1443            let mut valid_transfer_targets: Vec<String> = sub_agents
1444                .iter()
1445                .map(|a| a.name().to_string())
1446                .collect();
1447
1448            // Merge in runner-provided targets (parent, peers) from RunConfig
1449            let run_config_targets = &ctx.run_config().transfer_targets;
1450            let parent_agent_name = ctx.run_config().parent_agent.clone();
1451            let sub_agent_names: std::collections::HashSet<&str> = sub_agents
1452                .iter()
1453                .map(|a| a.name())
1454                .collect();
1455
1456            for target in run_config_targets {
1457                // Skip if already in the list (from sub_agents)
1458                if sub_agent_names.contains(target.as_str()) {
1459                    continue;
1460                }
1461
1462                // Apply disallow flags
1463                let is_parent = parent_agent_name.as_deref() == Some(target.as_str());
1464                if is_parent && disallow_transfer_to_parent {
1465                    continue;
1466                }
1467                if !is_parent && disallow_transfer_to_peers {
1468                    continue;
1469                }
1470
1471                valid_transfer_targets.push(target.clone());
1472            }
1473
1474            // Inject transfer_to_agent tool if there are valid targets
1475            if !valid_transfer_targets.is_empty() {
1476                let transfer_tool_name = "transfer_to_agent";
1477                let transfer_tool_decl = serde_json::json!({
1478                    "name": transfer_tool_name,
1479                    "description": format!(
1480                        "Transfer execution to another agent. Valid targets: {}",
1481                        valid_transfer_targets.join(", ")
1482                    ),
1483                    "parameters": {
1484                        "type": "object",
1485                        "properties": {
1486                            "agent_name": {
1487                                "type": "string",
1488                                "description": "The name of the agent to transfer to.",
1489                                "enum": valid_transfer_targets
1490                            }
1491                        },
1492                        "required": ["agent_name"]
1493                    }
1494                });
1495                tool_declarations.insert(transfer_tool_name.to_string(), transfer_tool_decl);
1496            }
1497
1498
1499            // ===== CIRCUIT BREAKER STATE =====
1500            // Created fresh per invocation so it resets between runs.
1501            let mut circuit_breaker_state = circuit_breaker_threshold.map(CircuitBreakerState::new);
1502
1503            // ===== RESPONSE-ID CONTINUITY (provider-neutral) =====
1504            // Tracks the `interaction_id` carried by the most recent model
1505            // response so the next request can continue the conversation via
1506            // `LlmRequest.previous_response_id`. This is generic plumbing: it
1507            // contains no Gemini- or transport-specific logic. Providers that
1508            // do not support response chaining leave `interaction_id` as `None`,
1509            // so this stays `None` and `previous_response_id` is never set
1510            // (a no-op for generateContent and all other providers).
1511            let mut last_interaction_id: Option<String> = None;
1512
1513            // Multi-turn loop with max iterations
1514            let mut iteration = 0;
1515            let mut schema_retry_count: usize = 0;
1516
1517            loop {
1518                iteration += 1;
1519                if iteration > max_iterations {
1520                    yield Err(adk_core::AdkError::agent(
1521                        format!("Max iterations ({max_iterations}) exceeded")
1522                    ));
1523                    return;
1524                }
1525
1526                // Build request with conversation history
1527                // Merge agent-level generate_content_config with output_schema.
1528                // Agent-level config provides defaults (temperature, top_p, etc.),
1529                // output_schema is layered on top as response_schema.
1530                // If the runner set a cached_content name (via automatic cache lifecycle),
1531                // merge it into the config so the provider can reuse cached content.
1532                let config = match (&generate_content_config, &output_schema) {
1533                    (Some(base), Some(schema)) => {
1534                        let mut merged = base.clone();
1535                        merged.response_schema = Some(schema.clone());
1536                        Some(merged)
1537                    }
1538                    (Some(base), None) => Some(base.clone()),
1539                    (None, Some(schema)) => Some(adk_core::GenerateContentConfig {
1540                        response_schema: Some(schema.clone()),
1541                        ..Default::default()
1542                    }),
1543                    (None, None) => None,
1544                };
1545
1546                // Layer cached_content from RunConfig onto the request config.
1547                let config = if let Some(ref cached) = ctx.run_config().cached_content {
1548                    let mut cfg = config.unwrap_or_default();
1549                    // Only set if the agent hasn't already specified one
1550                    if cfg.cached_content.is_none() {
1551                        cfg.cached_content = Some(cached.clone());
1552                    }
1553                    Some(cfg)
1554                } else {
1555                    config
1556                };
1557
1558                let request = LlmRequest {
1559                    model: model.name().to_string(),
1560                    contents: conversation_history.clone(),
1561                    tools: tool_declarations.clone(),
1562                    config,
1563                    // Provider-neutral continuity: carry the most recent
1564                    // response's `interaction_id` forward so transports that
1565                    // support response chaining (e.g. the Gemini Interactions
1566                    // transport, which maps this to `previous_interaction_id`)
1567                    // can continue server-side. `None` for the first turn and
1568                    // for providers that never populate `interaction_id`.
1569                    previous_response_id: last_interaction_id.clone(),
1570                };
1571
1572                // ===== ENHANCED PLUGIN: BEFORE MODEL CALL =====
1573                // Enhanced plugins can modify the request or short-circuit the model call.
1574                // They run before legacy before_model_callbacks.
1575                #[cfg(feature = "enhanced-plugins")]
1576                let (request, model_response_override_from_plugin) = {
1577                    if let Some(epm) = &enhanced_plugin_manager {
1578                        match epm.run_before_model_call(request, ctx.clone() as Arc<dyn CallbackContext>).await {
1579                            Ok(BeforeModelCallResult::Continue(modified_request)) => {
1580                                (modified_request, None)
1581                            }
1582                            Ok(BeforeModelCallResult::ShortCircuit(response)) => {
1583                                // Use a default request since we're short-circuiting
1584                                (LlmRequest::new("", vec![]), Some(response))
1585                            }
1586                            Err(e) => {
1587                                yield Err(e);
1588                                return;
1589                            }
1590                        }
1591                    } else {
1592                        (request, None)
1593                    }
1594                };
1595                #[cfg(not(feature = "enhanced-plugins"))]
1596                let model_response_override_from_plugin: Option<LlmResponse> = None;
1597
1598                // ===== BEFORE MODEL CALLBACKS =====
1599                // These can modify the request or skip the model call by returning a response
1600                let mut current_request = request;
1601                let mut model_response_override = model_response_override_from_plugin;
1602                if model_response_override.is_none() {
1603                    for callback in before_model_callbacks.as_ref() {
1604                        match callback(ctx.clone() as Arc<dyn CallbackContext>, current_request.clone()).await {
1605                            Ok(BeforeModelResult::Continue(modified_request)) => {
1606                                // Callback may have modified the request, continue with it
1607                                current_request = modified_request;
1608                            }
1609                            Ok(BeforeModelResult::Skip(response)) => {
1610                                // Callback returned a response - skip model call
1611                                model_response_override = Some(response);
1612                                break;
1613                            }
1614                            Err(e) => {
1615                                // Callback failed - propagate error
1616                                yield Err(e);
1617                                return;
1618                            }
1619                        }
1620                    }
1621                }
1622                let request = current_request;
1623
1624                // Determine streaming source: cached response or real model
1625                let mut accumulated_content: Option<Content> = None;
1626                let mut final_provider_metadata: Option<serde_json::Value> = None;
1627
1628                if let Some(cached_response) = model_response_override {
1629                    // Use callback-provided response (e.g., from cache)
1630                    // Yield it as an event
1631                    accumulated_content = cached_response.content.clone();
1632                    final_provider_metadata = cached_response.provider_metadata.clone();
1633                    normalize_option_content(&mut accumulated_content);
1634                    if let Some(content) = accumulated_content.take() {
1635                        let has_function_calls = content
1636                            .parts
1637                            .iter()
1638                            .any(|part| matches!(part, Part::FunctionCall { .. }));
1639                        let content = if has_function_calls {
1640                            content
1641                        } else {
1642                            Self::apply_output_guardrails(output_guardrails.as_ref(), content).await?
1643                        };
1644                        accumulated_content = Some(content);
1645                    }
1646
1647                    let mut cached_event = Event::new(&invocation_id);
1648                    cached_event.author = agent_name.clone();
1649                    cached_event.llm_response.content = accumulated_content.clone();
1650                    cached_event.llm_response.provider_metadata = cached_response.provider_metadata.clone();
1651                    // Surface and track the response id for provider-neutral continuity.
1652                    cached_event.llm_response.interaction_id = cached_response.interaction_id.clone();
1653                    if cached_response.interaction_id.is_some() {
1654                        last_interaction_id = cached_response.interaction_id.clone();
1655                    }
1656                    cached_event.llm_request = Some(serde_json::to_string(&request).unwrap_or_default());
1657                    cached_event.provider_metadata.insert("gcp.vertex.agent.llm_request".to_string(), serde_json::to_string(&request).unwrap_or_default());
1658                    cached_event.provider_metadata.insert("gcp.vertex.agent.llm_response".to_string(), serde_json::to_string(&cached_response).unwrap_or_default());
1659
1660                    // Populate long_running_tool_ids for function calls from long-running tools
1661                    if let Some(ref content) = accumulated_content {
1662                        cached_event.long_running_tool_ids = collect_long_running_ids(content);
1663                    }
1664
1665                    yield Ok(cached_event);
1666                } else {
1667                    // Record LLM request for tracing
1668                    let request_json = serde_json::to_string(&request).unwrap_or_default();
1669                    let trace_request_json = trace_json_payload(
1670                        &request,
1671                        ctx.run_config().record_payloads,
1672                        ctx.run_config().trace_payload_max_bytes,
1673                    );
1674
1675                    // Create call_llm span with GCP attributes (works for all model types)
1676                    let llm_ts = std::time::SystemTime::now()
1677                        .duration_since(std::time::UNIX_EPOCH)
1678                        .unwrap_or_default()
1679                        .as_nanos();
1680                    let llm_event_id = format!("{}_llm_{}", invocation_id, llm_ts);
1681                    let llm_span = tracing::info_span!(
1682                        "call_llm",
1683                        "gcp.vertex.agent.event_id" = %llm_event_id,
1684                        "gcp.vertex.agent.invocation_id" = %invocation_id,
1685                        "gcp.vertex.agent.session_id" = %ctx.session_id(),
1686                        "gen_ai.conversation.id" = %ctx.session_id(),
1687                        "gcp.vertex.agent.llm_request" = %trace_request_json,
1688                        "gcp.vertex.agent.llm_response" = tracing::field::Empty  // Placeholder for later recording
1689                    );
1690                    let _llm_guard = llm_span.enter();
1691
1692                    // Check streaming mode from run config
1693                    use adk_core::StreamingMode;
1694                    let streaming_mode = ctx.run_config().streaming_mode;
1695                    let should_stream_to_client = matches!(streaming_mode, StreamingMode::SSE | StreamingMode::Bidi)
1696                        && output_guardrails.is_empty();
1697
1698                    // Always use streaming internally for LLM calls
1699                    let mut response_stream = model.generate_content(request, true).await?;
1700
1701                    use futures::StreamExt;
1702
1703                    // Track last chunk for final event metadata (used in None mode)
1704                    let mut last_chunk: Option<LlmResponse> = None;
1705
1706                    // Stream and process chunks with AfterModel callbacks
1707                    while let Some(chunk_result) = response_stream.next().await {
1708                        let mut chunk = match chunk_result {
1709                            Ok(c) => c,
1710                            Err(e) => {
1711                                yield Err(e);
1712                                return;
1713                            }
1714                        };
1715
1716                        // ===== AFTER MODEL CALLBACKS (per chunk) =====
1717                        // Callbacks can modify each streaming chunk
1718                        for callback in after_model_callbacks.as_ref() {
1719                            match callback(ctx.clone() as Arc<dyn CallbackContext>, chunk.clone()).await {
1720                                Ok(Some(modified_chunk)) => {
1721                                    // Callback modified this chunk
1722                                    chunk = modified_chunk;
1723                                    break;
1724                                }
1725                                Ok(None) => {
1726                                    // Continue to next callback
1727                                    continue;
1728                                }
1729                                Err(e) => {
1730                                    // Callback failed - propagate error
1731                                    yield Err(e);
1732                                    return;
1733                                }
1734                            }
1735                        }
1736
1737                        normalize_option_content(&mut chunk.content);
1738
1739                        // Accumulate content for conversation history (always needed)
1740                        if let Some(chunk_content) = chunk.content.clone() {
1741                            if let Some(ref mut acc) = accumulated_content {
1742                                acc.parts.extend(chunk_content.parts);
1743                            } else {
1744                                accumulated_content = Some(chunk_content);
1745                            }
1746                        }
1747
1748                        // For SSE/Bidi mode: yield each chunk immediately with stable event ID
1749                        if should_stream_to_client {
1750                            let mut partial_event = Event::with_id(&llm_event_id, &invocation_id);
1751                            partial_event.author = agent_name.clone();
1752                            partial_event.llm_request = Some(request_json.clone());
1753                            partial_event.provider_metadata.insert("gcp.vertex.agent.llm_request".to_string(), request_json.clone());
1754                            partial_event.provider_metadata.insert("gcp.vertex.agent.llm_response".to_string(), serde_json::to_string(&chunk).unwrap_or_default());
1755                            partial_event.llm_response.partial = chunk.partial;
1756                            partial_event.llm_response.turn_complete = chunk.turn_complete;
1757                            partial_event.llm_response.finish_reason = chunk.finish_reason;
1758                            partial_event.llm_response.usage_metadata = chunk.usage_metadata.clone();
1759                            partial_event.llm_response.content = chunk.content.clone();
1760                            partial_event.llm_response.provider_metadata = chunk.provider_metadata.clone();
1761                            partial_event.llm_response.interaction_id = chunk.interaction_id.clone();
1762
1763                            // Populate long_running_tool_ids
1764                            if let Some(ref content) = chunk.content {
1765                                partial_event.long_running_tool_ids = collect_long_running_ids(content);
1766                            }
1767
1768                            yield Ok(partial_event);
1769                        }
1770
1771                        // Track the response id for provider-neutral continuity.
1772                        // Transports that support response chaining populate
1773                        // `interaction_id`; others leave it `None` (no-op).
1774                        if chunk.interaction_id.is_some() {
1775                            last_interaction_id = chunk.interaction_id.clone();
1776                        }
1777
1778                        // Store last chunk for final event metadata
1779                        last_chunk = Some(chunk.clone());
1780
1781                        // Check if turn is complete
1782                        if chunk.turn_complete {
1783                            break;
1784                        }
1785                    }
1786
1787                    // For None mode: yield single final event with accumulated content
1788                    if !should_stream_to_client {
1789                        if let Some(content) = accumulated_content.take() {
1790                            let has_function_calls = content
1791                                .parts
1792                                .iter()
1793                                .any(|part| matches!(part, Part::FunctionCall { .. }));
1794                            let content = if has_function_calls {
1795                                content
1796                            } else {
1797                                Self::apply_output_guardrails(output_guardrails.as_ref(), content).await?
1798                            };
1799                            accumulated_content = Some(content);
1800                        }
1801
1802                        let mut final_event = Event::with_id(&llm_event_id, &invocation_id);
1803                        final_event.author = agent_name.clone();
1804                        final_event.llm_request = Some(request_json.clone());
1805                        final_event.provider_metadata.insert("gcp.vertex.agent.llm_request".to_string(), request_json.clone());
1806                        final_event.llm_response.content = accumulated_content.clone();
1807                        final_event.llm_response.partial = false;
1808                        final_event.llm_response.turn_complete = true;
1809
1810                        // Copy metadata from last chunk
1811                        if let Some(ref last) = last_chunk {
1812                            final_event.llm_response.finish_reason = last.finish_reason;
1813                            final_event.llm_response.usage_metadata = last.usage_metadata.clone();
1814                            final_event.llm_response.provider_metadata = last.provider_metadata.clone();
1815                            final_event.llm_response.interaction_id = last.interaction_id.clone();
1816                            final_provider_metadata = last.provider_metadata.clone();
1817                            final_event.provider_metadata.insert("gcp.vertex.agent.llm_response".to_string(), serde_json::to_string(last).unwrap_or_default());
1818                        }
1819
1820                        // Populate long_running_tool_ids
1821                        if let Some(ref content) = accumulated_content {
1822                            final_event.long_running_tool_ids = collect_long_running_ids(content);
1823                        }
1824
1825                        yield Ok(final_event);
1826                    }
1827
1828                    // Record LLM response to span before guard drops
1829                    if let Some(ref content) = accumulated_content {
1830                        let response_json = trace_json_payload(
1831                            content,
1832                            ctx.run_config().record_payloads,
1833                            ctx.run_config().trace_payload_max_bytes,
1834                        );
1835                        llm_span.record("gcp.vertex.agent.llm_response", &response_json);
1836                    }
1837                }
1838
1839                // ===== ENHANCED PLUGIN: AFTER MODEL CALL =====
1840                // Enhanced plugins can modify the accumulated model response.
1841                // They run after the full response is accumulated (not per-chunk).
1842                #[cfg(feature = "enhanced-plugins")]
1843                if let Some(epm) = &enhanced_plugin_manager {
1844                    if let Some(ref content) = accumulated_content {
1845                        let response_for_hook = LlmResponse {
1846                            content: Some(content.clone()),
1847                            provider_metadata: final_provider_metadata.clone(),
1848                            ..Default::default()
1849                        };
1850                        match epm.run_after_model_call(response_for_hook, ctx.clone() as Arc<dyn CallbackContext>).await {
1851                            Ok(adk_plugin::AfterModelCallResult::Continue(modified_response)) => {
1852                                accumulated_content = modified_response.content;
1853                                if modified_response.provider_metadata.is_some() {
1854                                    final_provider_metadata = modified_response.provider_metadata;
1855                                }
1856                            }
1857                            Err(e) => {
1858                                yield Err(e);
1859                                return;
1860                            }
1861                        }
1862                    }
1863                }
1864
1865                // After streaming/caching completes, check for function calls in accumulated content
1866                let function_call_names: Vec<String> = accumulated_content.as_ref()
1867                    .map(|c| c.parts.iter()
1868                        .filter_map(|p| {
1869                            if let Part::FunctionCall { name, .. } = p {
1870                                Some(name.clone())
1871                            } else {
1872                                None
1873                            }
1874                        })
1875                        .collect())
1876                    .unwrap_or_default();
1877
1878                let has_function_calls = !function_call_names.is_empty();
1879
1880                // Check if ALL function calls are from long-running tools
1881                // If so, we should NOT continue the loop - the tool returned a pending status
1882                // and the agent/client will poll for completion later
1883                let all_calls_are_long_running = has_function_calls && function_call_names.iter().all(|name| {
1884                    tool_map.get(name)
1885                        .map(|t| t.is_long_running())
1886                        .unwrap_or(false)
1887                });
1888
1889                // Add final content to history
1890                if let Some(ref content) = accumulated_content {
1891                    conversation_history.push(Self::augment_content_for_history(
1892                        content,
1893                        final_provider_metadata.as_ref(),
1894                    ));
1895
1896                    // Handle output_key: save final agent output to state_delta
1897                    if let Some(ref output_key) = output_key
1898                        && !has_function_calls
1899                    {
1900                        let mut text_parts = String::new();
1901                        for part in &content.parts {
1902                            if let Part::Text { text } = part {
1903                                text_parts.push_str(text);
1904                            }
1905                        }
1906                        if !text_parts.is_empty() {
1907                            // Yield a final state update event
1908                            let mut state_event = Event::new(&invocation_id);
1909                            state_event.author = agent_name.clone();
1910                            state_event.actions.state_delta.insert(
1911                                output_key.clone(),
1912                                serde_json::Value::String(text_parts),
1913                            );
1914                            yield Ok(state_event);
1915                        }
1916                    }
1917                }
1918
1919                if !has_function_calls {
1920                    // ===== OUTPUT SCHEMA VALIDATION =====
1921                    // When output_schema is set, validate the response text against
1922                    // the schema. If invalid, retry with a correction prompt up to
1923                    // output_max_retries times.
1924                    if let Some(ref schema) = output_schema {
1925                        let text = accumulated_content
1926                            .as_ref()
1927                            .map(|c| {
1928                                c.parts
1929                                    .iter()
1930                                    .filter_map(|p| {
1931                                        if let Part::Text { text } = p {
1932                                            Some(text.as_str())
1933                                        } else {
1934                                            None
1935                                        }
1936                                    })
1937                                    .collect::<Vec<_>>()
1938                                    .join("")
1939                            })
1940                            .unwrap_or_default();
1941
1942                        if !text.is_empty()
1943                            && let Err(validation_error) = validate_output_against_schema(&text, schema)
1944                        {
1945                                if schema_retry_count >= output_max_retries {
1946                                    yield Err(adk_core::AdkError::agent(format!(
1947                                        "output schema validation failed after {} attempts",
1948                                        output_max_retries
1949                                    )));
1950                                    return;
1951                                }
1952                                schema_retry_count += 1;
1953
1954                                // Append a correction prompt and retry
1955                                let correction = format!(
1956                                    "Your output did not match the required schema. Error: {}. Please produce valid JSON matching the schema.",
1957                                    validation_error
1958                                );
1959                                conversation_history.push(Content {
1960                                    role: "user".to_string(),
1961                                    parts: vec![Part::Text { text: correction }],
1962                                });
1963                                continue;
1964                        }
1965                    }
1966
1967                    // No function calls, we're done
1968                    // Record LLM response for tracing
1969                    if let Some(ref content) = accumulated_content {
1970                        let response_json = trace_json_payload(
1971                            content,
1972                            ctx.run_config().record_payloads,
1973                            ctx.run_config().trace_payload_max_bytes,
1974                        );
1975                        tracing::Span::current().record("gcp.vertex.agent.llm_response", &response_json);
1976                    }
1977
1978                    tracing::info!(agent.name = %agent_name, "Agent execution complete");
1979                    break;
1980                }
1981
1982                // Execute function calls and add responses to history
1983                if let Some(content) = &accumulated_content {
1984                    // ===== RESOLVE TOOL EXECUTION STRATEGY =====
1985                    // Per-agent override; defaults to Sequential if not set.
1986                    let strategy = agent_tool_execution_strategy
1987                        .unwrap_or(ToolExecutionStrategy::Sequential);
1988
1989                    // Collect function call parts with original indices for
1990                    // order-preserving reassembly in parallel/auto modes.
1991                    // Tuple: (index, name, args, id, function_call_id)
1992                    let mut fc_parts: Vec<(usize, String, serde_json::Value, Option<String>, String)> = Vec::new();
1993                    {
1994                        let mut tci = 0usize;
1995                        for part in &content.parts {
1996                            if let Part::FunctionCall { name, args, id, .. } = part {
1997                                let fallback = format!("{}_{}_{}", invocation_id, name, tci);
1998                                let fcid = id.clone().unwrap_or(fallback);
1999                                fc_parts.push((tci, name.clone(), args.clone(), id.clone(), fcid));
2000                                tci += 1;
2001                            }
2002                        }
2003                    }
2004
2005                    // ===== HANDLE transfer_to_agent BEFORE DISPATCH =====
2006                    // Transfer calls cause an immediate return from the stream,
2007                    // so they must be handled inline regardless of strategy.
2008                    let mut transfer_handled = false;
2009                    for (_, fc_name, fc_args, fc_id, _) in &fc_parts {
2010                        if fc_name == "transfer_to_agent" {
2011                            let target_agent = fc_args.get("agent_name")
2012                                .and_then(|v| v.as_str())
2013                                .unwrap_or_default()
2014                                .to_string();
2015
2016                            let valid_target = valid_transfer_targets.iter().any(|n| n == &target_agent);
2017                            if !valid_target {
2018                                let error_content = Content {
2019                                    role: "function".to_string(),
2020                                    parts: vec![Part::FunctionResponse {
2021                                        function_response: FunctionResponseData::new(
2022                                            fc_name.clone(),
2023                                            serde_json::json!({
2024                                                "error": format!(
2025                                                    "Agent '{}' not found. Available agents: {:?}",
2026                                                    target_agent, valid_transfer_targets
2027                                                )
2028                                            }),
2029                                        ),
2030                                        id: fc_id.clone(),
2031                                    }],
2032                                };
2033                                conversation_history.push(error_content.clone());
2034                                let mut error_event = Event::new(&invocation_id);
2035                                error_event.author = agent_name.clone();
2036                                error_event.llm_response.content = Some(error_content);
2037                                yield Ok(error_event);
2038                                continue;
2039                            }
2040
2041                            let mut transfer_event = Event::new(&invocation_id);
2042                            transfer_event.author = agent_name.clone();
2043                            transfer_event.actions.transfer_to_agent = Some(target_agent);
2044                            yield Ok(transfer_event);
2045                            transfer_handled = true;
2046                            break;
2047                        }
2048                    }
2049                    if transfer_handled {
2050                        return;
2051                    }
2052
2053                    // Filter out transfer_to_agent and built-in tools
2054                    let fc_parts: Vec<_> = fc_parts.into_iter().filter(|(_, fc_name, _, _, _)| {
2055                        if fc_name == "transfer_to_agent" {
2056                            return false;
2057                        }
2058                        if let Some(tool) = tool_map.get(fc_name)
2059                            && tool.is_builtin()
2060                        {
2061                            adk_telemetry::debug!(tool.name = %fc_name, "skipping built-in tool execution");
2062                            return false;
2063                        }
2064                        true
2065                    }).collect();
2066
2067                    // ===== TOOL CONFIRMATION PRE-CHECK =====
2068                    // Tool confirmation interrupts cause an immediate return,
2069                    // so check before parallel dispatch.
2070                    let mut confirmation_interrupted = false;
2071                    for (_, fc_name, fc_args, _, fc_call_id) in &fc_parts {
2072                        if tool_confirmation_policy.requires_confirmation(fc_name)
2073                            && ctx.run_config().tool_confirmation_decisions.get(fc_name).copied().is_none()
2074                        {
2075                                let mut ce = Event::new(&invocation_id);
2076                                ce.author = agent_name.clone();
2077                                ce.llm_response.interrupted = true;
2078                                ce.llm_response.turn_complete = true;
2079                                ce.llm_response.content = Some(Content {
2080                                    role: "model".to_string(),
2081                                    parts: vec![Part::Text {
2082                                        text: format!(
2083                                            "Tool confirmation required for '{}'. Provide approve/deny decision to continue.",
2084                                            fc_name
2085                                        ),
2086                                    }],
2087                                });
2088                                ce.actions.tool_confirmation = Some(ToolConfirmationRequest {
2089                                    tool_name: fc_name.clone(),
2090                                    function_call_id: Some(fc_call_id.clone()),
2091                                    args: fc_args.clone(),
2092                                });
2093                                yield Ok(ce);
2094                                confirmation_interrupted = true;
2095                                break;
2096                        }
2097                    }
2098                    if confirmation_interrupted {
2099                        return;
2100                    }
2101
2102                    // Wrap circuit breaker in Mutex for shared access across parallel futures.
2103                    let cb_mutex = std::sync::Mutex::new(circuit_breaker_state.take());
2104
2105                    // Create concurrency manager for semaphore-based tool dispatch enforcement.
2106                    // Per-tool overrides take precedence over the global limit.
2107                    let concurrency_manager = adk_core::ToolConcurrencyManager::new(
2108                        &ctx.run_config().tool_concurrency,
2109                    );
2110
2111                    // Per-tool execution async block. Returns (index, Content, EventActions, escalate_or_skip).
2112                    // Each tool retains its own retry budget, circuit breaker, tracing span,
2113                    // before/after callbacks, and error handling. Errors are captured as
2114                    // { "error": "..." } JSON — failed tools do not abort the batch.
2115                    let execute_one_tool = |idx: usize, name: String, args: serde_json::Value,
2116                                            id: Option<String>, function_call_id: String| {
2117                        let ctx = ctx.clone();
2118                        let tool_map = &tool_map;
2119                        let tool_retry_budgets = &tool_retry_budgets;
2120                        let default_retry_budget = &default_retry_budget;
2121                        let before_tool_callbacks = &before_tool_callbacks;
2122                        let after_tool_callbacks = &after_tool_callbacks;
2123                        let after_tool_callbacks_full = &after_tool_callbacks_full;
2124                        let on_tool_error_callbacks = &on_tool_error_callbacks;
2125                        let tool_confirmation_policy = &tool_confirmation_policy;
2126                        let cb_mutex = &cb_mutex;
2127                        let invocation_id = &invocation_id;
2128                        let concurrency_manager = &concurrency_manager;
2129                        #[cfg(feature = "enhanced-plugins")]
2130                        let enhanced_plugin_manager = &enhanced_plugin_manager;
2131                        async move {
2132                            let mut tool_actions = EventActions::default();
2133                            let mut response_content: Option<Content> = None;
2134                            let mut run_after_tool_callbacks = true;
2135                            let mut tool_outcome_for_callback: Option<ToolOutcome> = None;
2136                            let mut executed_tool: Option<Arc<dyn Tool>> = None;
2137                            let mut executed_tool_response: Option<serde_json::Value> = None;
2138
2139                            // Acquire concurrency permit before tool execution.
2140                            // The permit is held for the entire duration of this tool call
2141                            // and released on drop when this async block completes.
2142                            let _concurrency_permit = match concurrency_manager.acquire(&name).await {
2143                                Ok(permit) => Some(permit),
2144                                Err(e) => {
2145                                    // Concurrency limit reached with Fail policy — return error
2146                                    let error_content = Content {
2147                                        role: "function".to_string(),
2148                                        parts: vec![Part::FunctionResponse {
2149                                            function_response: FunctionResponseData::new(
2150                                                name.clone(),
2151                                                serde_json::json!({ "error": e.to_string() }),
2152                                            ),
2153                                            id: id.clone(),
2154                                        }],
2155                                    };
2156                                    return (idx, error_content, tool_actions, false);
2157                                }
2158                            };
2159
2160                            // Tool confirmation (deny case; None handled by pre-check)
2161                            if tool_confirmation_policy.requires_confirmation(&name) {
2162                                match ctx.run_config().tool_confirmation_decisions.get(&name).copied() {
2163                                    Some(ToolConfirmationDecision::Approve) => {
2164                                        tool_actions.tool_confirmation_decision =
2165                                            Some(ToolConfirmationDecision::Approve);
2166                                    }
2167                                    Some(ToolConfirmationDecision::Deny) => {
2168                                        tool_actions.tool_confirmation_decision =
2169                                            Some(ToolConfirmationDecision::Deny);
2170                                        response_content = Some(Content {
2171                                            role: "function".to_string(),
2172                                            parts: vec![Part::FunctionResponse {
2173                                                function_response: FunctionResponseData::new(
2174                                                    name.clone(),
2175                                                    serde_json::json!({
2176                                                        "error": format!("Tool '{}' execution denied by confirmation policy", name)
2177                                                    }),
2178                                                ),
2179                                                id: id.clone(),
2180                                            }],
2181                                        });
2182                                        run_after_tool_callbacks = false;
2183                                    }
2184                                    None => {
2185                                        response_content = Some(Content {
2186                                            role: "function".to_string(),
2187                                            parts: vec![Part::FunctionResponse {
2188                                                function_response: FunctionResponseData::new(
2189                                                    name.clone(),
2190                                                    serde_json::json!({
2191                                                        "error": format!("Tool '{}' requires confirmation", name)
2192                                                    }),
2193                                                ),
2194                                                id: id.clone(),
2195                                            }],
2196                                        });
2197                                        run_after_tool_callbacks = false;
2198                                    }
2199                                }
2200                            }
2201
2202                            // Before-tool callbacks
2203                            // Track potentially modified args for enhanced plugin after-hook
2204                            #[allow(unused_mut)]
2205                            let mut final_args = args.clone();
2206
2207                            // ===== ENHANCED PLUGIN: BEFORE TOOL CALL =====
2208                            #[cfg(feature = "enhanced-plugins")]
2209                            if response_content.is_none() {
2210                                if let Some(epm) = &enhanced_plugin_manager {
2211                                    if let Some(tool_ref) = tool_map.get(&name) {
2212                                        match epm.run_before_tool_call(
2213                                            tool_ref.clone(),
2214                                            final_args.clone(),
2215                                            ctx.clone() as Arc<dyn CallbackContext>,
2216                                        ).await {
2217                                            Ok(BeforeToolCallResult::Continue(modified_args)) => {
2218                                                final_args = modified_args;
2219                                            }
2220                                            Ok(BeforeToolCallResult::ShortCircuit(synthetic_result)) => {
2221                                                // Short-circuit: use synthetic result, skip tool execution
2222                                                response_content = Some(Content {
2223                                                    role: "function".to_string(),
2224                                                    parts: vec![Part::FunctionResponse {
2225                                                        function_response: FunctionResponseData::from_tool_result(
2226                                                            name.clone(),
2227                                                            synthetic_result,
2228                                                        ),
2229                                                        id: id.clone(),
2230                                                    }],
2231                                                });
2232                                                executed_tool = Some(tool_ref.clone());
2233                                            }
2234                                            Err(e) => {
2235                                                response_content = Some(Content {
2236                                                    role: "function".to_string(),
2237                                                    parts: vec![Part::FunctionResponse {
2238                                                        function_response: FunctionResponseData::new(
2239                                                            name.clone(),
2240                                                            serde_json::json!({ "error": e.to_string() }),
2241                                                        ),
2242                                                        id: id.clone(),
2243                                                    }],
2244                                                });
2245                                                run_after_tool_callbacks = false;
2246                                            }
2247                                        }
2248                                    }
2249                                }
2250                            }
2251
2252                            if response_content.is_none() {
2253                                let tool_ctx = Arc::new(ToolCallbackContext::new(
2254                                    ctx.clone(),
2255                                    name.clone(),
2256                                    final_args.clone(),
2257                                ));
2258                                for callback in before_tool_callbacks.as_ref() {
2259                                    match callback(tool_ctx.clone() as Arc<dyn CallbackContext>).await {
2260                                        Ok(Some(c)) => { response_content = Some(c); break; }
2261                                        Ok(None) => continue,
2262                                        Err(e) => {
2263                                            response_content = Some(Content {
2264                                                role: "function".to_string(),
2265                                                parts: vec![Part::FunctionResponse {
2266                                                    function_response: FunctionResponseData::new(
2267                                                        name.clone(),
2268                                                        serde_json::json!({ "error": e.to_string() }),
2269                                                    ),
2270                                                    id: id.clone(),
2271                                                }],
2272                                            });
2273                                            run_after_tool_callbacks = false;
2274                                            break;
2275                                        }
2276                                    }
2277                                }
2278                            }
2279
2280                            // Circuit breaker check
2281                            if response_content.is_none() {
2282                                let guard = cb_mutex.lock().unwrap_or_else(|e| e.into_inner());
2283                                if let Some(ref cb_state) = *guard
2284                                    && cb_state.is_open(&name)
2285                                {
2286                                    let msg = format!(
2287                                        "Tool '{}' is temporarily disabled after {} consecutive failures",
2288                                        name, cb_state.threshold
2289                                    );
2290                                    tracing::warn!(tool.name = %name, "circuit breaker open, skipping tool execution");
2291                                    response_content = Some(Content {
2292                                        role: "function".to_string(),
2293                                        parts: vec![Part::FunctionResponse {
2294                                            function_response: FunctionResponseData::new(
2295                                                name.clone(),
2296                                                serde_json::json!({ "error": msg }),
2297                                            ),
2298                                            id: id.clone(),
2299                                        }],
2300                                    });
2301                                    run_after_tool_callbacks = false;
2302                                }
2303                                drop(guard);
2304                            }
2305
2306                            // Execute tool with retry budget and tracing
2307                            if response_content.is_none() {
2308                                if let Some(tool) = tool_map.get(&name) {
2309                                    let tool_ctx: Arc<dyn ToolContext> = Arc::new(
2310                                        AgentToolContext::new(ctx.clone(), function_call_id.clone()),
2311                                    );
2312                                    let span_name = format!("execute_tool {name}");
2313                                    let tool_span = tracing::info_span!(
2314                                        "",
2315                                        otel.name = %span_name,
2316                                        tool.name = %name,
2317                                        "gcp.vertex.agent.event_id" = %format!("{}_{}", invocation_id, name),
2318                                        "gcp.vertex.agent.invocation_id" = %invocation_id,
2319                                        "gcp.vertex.agent.session_id" = %ctx.session_id(),
2320                                        "gen_ai.conversation.id" = %ctx.session_id()
2321                                    );
2322
2323                                    let budget = tool_retry_budgets.get(&name)
2324                                        .or(default_retry_budget.as_ref());
2325                                    let max_attempts = budget.map(|b| b.max_retries + 1).unwrap_or(1);
2326                                    let retry_delay = budget.map(|b| b.delay).unwrap_or_default();
2327
2328                                    let tool_clone = tool.clone();
2329                                    let tool_start = std::time::Instant::now();
2330                                    let mut last_error = String::new();
2331                                    let mut final_attempt: u32 = 0;
2332                                    let mut retry_result: Option<serde_json::Value> = None;
2333
2334                                    for attempt in 0..max_attempts {
2335                                        final_attempt = attempt;
2336                                        if attempt > 0 {
2337                                            tokio::time::sleep(retry_delay).await;
2338                                        }
2339                                        match async {
2340                                            let args_payload = trace_json_payload(
2341                                                &final_args,
2342                                                ctx.run_config().record_payloads,
2343                                                ctx.run_config().trace_payload_max_bytes,
2344                                            );
2345                                            tracing::debug!(tool.name = %name, tool.args = %args_payload, attempt = attempt, "tool_call");
2346                                            let exec_future = tool_clone.execute(tool_ctx.clone(), final_args.clone());
2347                                            let unwind_safe_future = std::panic::AssertUnwindSafe(
2348                                                tokio::time::timeout(tool_timeout, exec_future)
2349                                            );
2350                                            match futures::FutureExt::catch_unwind(unwind_safe_future).await {
2351                                                Ok(result) => result,
2352                                                Err(_panic) => Ok(Err(adk_core::AdkError::tool(
2353                                                    format!("tool '{}' panicked during execution", name),
2354                                                ))),
2355                                            }
2356                                        }.instrument(tool_span.clone()).await {
2357                                            Ok(Ok(value)) => {
2358                                                let result_payload = trace_json_payload(
2359                                                    &value,
2360                                                    ctx.run_config().record_payloads,
2361                                                    ctx.run_config().trace_payload_max_bytes,
2362                                                );
2363                                                tracing::debug!(tool.name = %name, tool.result = %result_payload, "tool_result");
2364                                                retry_result = Some(value);
2365                                                break;
2366                                            }
2367                                            Ok(Err(e)) => {
2368                                                last_error = e.to_string();
2369                                                if attempt + 1 < max_attempts {
2370                                                    tracing::warn!(tool.name = %name, attempt = attempt, error = %last_error, "tool execution failed, retrying");
2371                                                } else {
2372                                                    tracing::warn!(tool.name = %name, error = %last_error, "tool_error");
2373                                                }
2374                                            }
2375                                            Err(_) => {
2376                                                last_error = format!(
2377                                                    "Tool '{}' timed out after {} seconds",
2378                                                    name, tool_timeout.as_secs()
2379                                                );
2380                                                if attempt + 1 < max_attempts {
2381                                                    tracing::warn!(tool.name = %name, attempt = attempt, timeout_secs = tool_timeout.as_secs(), "tool timed out, retrying");
2382                                                } else {
2383                                                    tracing::warn!(tool.name = %name, timeout_secs = tool_timeout.as_secs(), "tool_timeout");
2384                                                }
2385                                            }
2386                                        }
2387                                    }
2388
2389                                    let tool_duration = tool_start.elapsed();
2390                                    let (tool_success, tool_error_message, function_response) = match retry_result {
2391                                        Some(value) => (true, None, value),
2392                                        None => (false, Some(last_error.clone()), serde_json::json!({ "error": last_error })),
2393                                    };
2394
2395                                    let outcome = ToolOutcome {
2396                                        tool_name: name.clone(),
2397                                        tool_args: final_args.clone(),
2398                                        success: tool_success,
2399                                        duration: tool_duration,
2400                                        error_message: tool_error_message.clone(),
2401                                        attempt: final_attempt,
2402                                    };
2403                                    tool_outcome_for_callback = Some(outcome);
2404
2405                                    // Circuit breaker recording
2406                                    {
2407                                        let mut guard = cb_mutex.lock().unwrap_or_else(|e| e.into_inner());
2408                                        if let Some(ref mut cb_state) = *guard {
2409                                            cb_state.record(tool_outcome_for_callback.as_ref().unwrap());
2410                                        }
2411                                    }
2412
2413                                    // On-tool-error callbacks
2414                                    let final_function_response = if !tool_success {
2415                                        let mut fallback_result = None;
2416                                        let error_msg = tool_error_message.clone().unwrap_or_default();
2417                                        for callback in on_tool_error_callbacks.as_ref() {
2418                                            match callback(
2419                                                ctx.clone() as Arc<dyn CallbackContext>,
2420                                                tool.clone(),
2421                                                final_args.clone(),
2422                                                error_msg.clone(),
2423                                            ).await {
2424                                                Ok(Some(result)) => { fallback_result = Some(result); break; }
2425                                                Ok(None) => continue,
2426                                                Err(e) => { tracing::warn!(error = %e, "on_tool_error callback failed"); break; }
2427                                            }
2428                                        }
2429                                        fallback_result.unwrap_or(function_response)
2430                                    } else {
2431                                        function_response
2432                                    };
2433
2434                                    let confirmation_decision = tool_actions.tool_confirmation_decision;
2435                                    tool_actions = tool_ctx.actions();
2436                                    if tool_actions.tool_confirmation_decision.is_none() {
2437                                        tool_actions.tool_confirmation_decision = confirmation_decision;
2438                                    }
2439                                    executed_tool = Some(tool.clone());
2440                                    executed_tool_response = Some(final_function_response.clone());
2441                                    response_content = Some(Content {
2442                                        role: "function".to_string(),
2443                                        parts: vec![Part::FunctionResponse {
2444                                            function_response: FunctionResponseData::from_tool_result(
2445                                                name.clone(),
2446                                                final_function_response,
2447                                            ),
2448                                            id: id.clone(),
2449                                        }],
2450                                    });
2451                                } else {
2452                                    response_content = Some(Content {
2453                                        role: "function".to_string(),
2454                                        parts: vec![Part::FunctionResponse {
2455                                            function_response: FunctionResponseData::new(
2456                                                name.clone(),
2457                                                serde_json::json!({
2458                                                    "error": format!("Tool {} not found", name)
2459                                                }),
2460                                            ),
2461                                            id: id.clone(),
2462                                        }],
2463                                    });
2464                                }
2465                            }
2466
2467                            // After-tool callbacks
2468                            let mut response_content = response_content.expect("tool response content is set");
2469                            if run_after_tool_callbacks {
2470                                let outcome_ctx: Arc<dyn CallbackContext> = match tool_outcome_for_callback {
2471                                    Some(outcome) => Arc::new(ToolOutcomeCallbackContext {
2472                                        inner: ctx.clone() as Arc<dyn CallbackContext>,
2473                                        outcome,
2474                                    }),
2475                                    None => ctx.clone() as Arc<dyn CallbackContext>,
2476                                };
2477                                let cb_ctx: Arc<dyn CallbackContext> = Arc::new(ToolCallbackContext::new(
2478                                    outcome_ctx,
2479                                    name.clone(),
2480                                    final_args.clone(),
2481                                ));
2482                                for callback in after_tool_callbacks.as_ref() {
2483                                    match callback(cb_ctx.clone()).await {
2484                                        Ok(Some(modified)) => { response_content = modified; break; }
2485                                        Ok(None) => continue,
2486                                        Err(e) => {
2487                                            response_content = Content {
2488                                                role: "function".to_string(),
2489                                                parts: vec![Part::FunctionResponse {
2490                                                    function_response: FunctionResponseData::new(
2491                                                        name.clone(),
2492                                                        serde_json::json!({ "error": e.to_string() }),
2493                                                    ),
2494                                                    id: id.clone(),
2495                                                }],
2496                                            };
2497                                            break;
2498                                        }
2499                                    }
2500                                }
2501                                if let (Some(tool_ref), Some(tool_resp)) = (&executed_tool, executed_tool_response) {
2502                                    for callback in after_tool_callbacks_full.as_ref() {
2503                                        match callback(
2504                                            cb_ctx.clone(), tool_ref.clone(), final_args.clone(), tool_resp.clone(),
2505                                        ).await {
2506                                            Ok(Some(modified_value)) => {
2507                                                response_content = Content {
2508                                                    role: "function".to_string(),
2509                                                    parts: vec![Part::FunctionResponse {
2510                                                        function_response: FunctionResponseData::from_tool_result(
2511                                                            name.clone(),
2512                                                            modified_value,
2513                                                        ),
2514                                                        id: id.clone(),
2515                                                    }],
2516                                                };
2517                                                break;
2518                                            }
2519                                            Ok(None) => continue,
2520                                            Err(e) => {
2521                                                response_content = Content {
2522                                                    role: "function".to_string(),
2523                                                    parts: vec![Part::FunctionResponse {
2524                                                        function_response: FunctionResponseData::new(
2525                                                            name.clone(),
2526                                                            serde_json::json!({ "error": e.to_string() }),
2527                                                        ),
2528                                                        id: id.clone(),
2529                                                    }],
2530                                                };
2531                                                break;
2532                                            }
2533                                        }
2534                                    }
2535                                }
2536
2537                                // ===== ENHANCED PLUGIN: AFTER TOOL CALL =====
2538                                // Enhanced plugins can modify the tool result after legacy callbacks.
2539                                #[cfg(feature = "enhanced-plugins")]
2540                                if let Some(epm) = &enhanced_plugin_manager {
2541                                    if let Some(tool_ref) = &executed_tool {
2542                                        // Extract the result value from the response content
2543                                        let result_value = response_content.parts.iter()
2544                                            .find_map(|p| {
2545                                                if let Part::FunctionResponse { function_response, .. } = p {
2546                                                    Some(function_response.response.clone())
2547                                                } else {
2548                                                    None
2549                                                }
2550                                            })
2551                                            .unwrap_or(serde_json::json!(null));
2552
2553                                        match epm.run_after_tool_call(
2554                                            tool_ref.clone(),
2555                                            &final_args,
2556                                            result_value,
2557                                            ctx.clone() as Arc<dyn CallbackContext>,
2558                                        ).await {
2559                                            Ok(adk_plugin::AfterToolCallResult::Continue(modified_result)) => {
2560                                                response_content = Content {
2561                                                    role: "function".to_string(),
2562                                                    parts: vec![Part::FunctionResponse {
2563                                                        function_response: FunctionResponseData::from_tool_result(
2564                                                            name.clone(),
2565                                                            modified_result,
2566                                                        ),
2567                                                        id: id.clone(),
2568                                                    }],
2569                                                };
2570                                            }
2571                                            Err(e) => {
2572                                                response_content = Content {
2573                                                    role: "function".to_string(),
2574                                                    parts: vec![Part::FunctionResponse {
2575                                                        function_response: FunctionResponseData::new(
2576                                                            name.clone(),
2577                                                            serde_json::json!({ "error": e.to_string() }),
2578                                                        ),
2579                                                        id: id.clone(),
2580                                                    }],
2581                                                };
2582                                            }
2583                                        }
2584                                    }
2585                                }
2586                            }
2587
2588                            let escalate_or_skip = tool_actions.escalate || tool_actions.skip_summarization;
2589                            (idx, response_content, tool_actions, escalate_or_skip)
2590                        }
2591                    };
2592
2593                    // ===== DISPATCH BASED ON STRATEGY =====
2594                    let mut results: Vec<(usize, Content, EventActions, bool)> = match strategy {
2595                        ToolExecutionStrategy::Sequential => {
2596                            let mut results = Vec::with_capacity(fc_parts.len());
2597                            for (idx, name, args, id, fcid) in fc_parts {
2598                                results.push(execute_one_tool(idx, name, args, id, fcid).await);
2599                            }
2600                            results
2601                        }
2602                        ToolExecutionStrategy::Parallel => {
2603                            use futures::StreamExt as _;
2604                            // All concurrency enforcement is handled by the
2605                            // ToolConcurrencyManager semaphore inside execute_one_tool.
2606                            // Use fc_parts.len() as buffer so all futures can start
2607                            // and queue on the semaphore for proper per-tool limiting.
2608                            let buffer_size = fc_parts.len().max(1);
2609                            futures::stream::iter(fc_parts.into_iter().map(
2610                                |(idx, name, args, id, fcid)| {
2611                                    execute_one_tool(idx, name, args, id, fcid)
2612                                },
2613                            ))
2614                            .buffer_unordered(buffer_size)
2615                            .collect()
2616                            .await
2617                        }
2618                        ToolExecutionStrategy::Auto => {
2619                            // Partition by is_read_only()
2620                            let mut read_only_fcs = Vec::new();
2621                            let mut mutable_fcs = Vec::new();
2622                            for fc in fc_parts {
2623                                let is_ro = tool_map.get(&fc.1)
2624                                    .map(|t| t.is_read_only())
2625                                    .unwrap_or(false);
2626                                if is_ro { read_only_fcs.push(fc); } else { mutable_fcs.push(fc); }
2627                            }
2628                            let mut all_results = Vec::new();
2629                            // Execute read-only tools concurrently first
2630                            // Concurrency enforcement is handled by the semaphore
2631                            // inside execute_one_tool.
2632                            if !read_only_fcs.is_empty() {
2633                                use futures::StreamExt as _;
2634                                let buffer_size = read_only_fcs.len().max(1);
2635                                all_results.extend(
2636                                    futures::stream::iter(read_only_fcs.into_iter().map(
2637                                        |(idx, name, args, id, fcid)| {
2638                                            execute_one_tool(idx, name, args, id, fcid)
2639                                        },
2640                                    ))
2641                                    .buffer_unordered(buffer_size)
2642                                    .collect::<Vec<_>>()
2643                                    .await,
2644                                );
2645                            }
2646                            // Then execute mutable tools sequentially
2647                            for (idx, name, args, id, fcid) in mutable_fcs {
2648                                all_results.push(execute_one_tool(idx, name, args, id, fcid).await);
2649                            }
2650                            all_results
2651                        }
2652                    };
2653                    // Preserve LLM-returned order even when tool futures finish out of order.
2654                    results.sort_by_key(|r| r.0);
2655
2656                    // Restore circuit breaker state from the mutex
2657                    circuit_breaker_state = cb_mutex.into_inner().unwrap_or_else(|e| e.into_inner());
2658
2659                    // Yield results in original order
2660                    for (_, response_content, tool_actions, escalate_or_skip) in results {
2661                        let mut tool_event = Event::new(&invocation_id);
2662                        tool_event.author = agent_name.clone();
2663                        tool_event.actions = tool_actions;
2664                        tool_event.llm_response.content = Some(response_content.clone());
2665                        yield Ok(tool_event);
2666
2667                        if escalate_or_skip {
2668                            return;
2669                        }
2670
2671                        conversation_history.push(response_content);
2672                    }
2673                }
2674
2675                // If all function calls were from long-running tools, we need ONE more model call
2676                // to let the model generate a user-friendly response about the pending task
2677                // But we mark this as the final iteration to prevent infinite loops
2678                if all_calls_are_long_running {
2679                    // Continue to next iteration for model to respond, but this will be the last
2680                    // The model will see the tool response and generate text like "Started task X..."
2681                    // On next iteration, there won't be function calls, so we'll break naturally
2682                }
2683            }
2684
2685            // ===== AFTER AGENT CALLBACKS =====
2686            // Execute after the agent completes
2687            for callback in after_agent_callbacks.as_ref() {
2688                match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
2689                    Ok(Some(content)) => {
2690                        // Callback returned content - yield it
2691                        let mut after_event = Event::new(&invocation_id);
2692                        after_event.author = agent_name.clone();
2693                        after_event.llm_response.content = Some(content);
2694                        yield Ok(after_event);
2695                        break; // First callback that returns content wins
2696                    }
2697                    Ok(None) => {
2698                        // Continue to next callback
2699                        continue;
2700                    }
2701                    Err(e) => {
2702                        // Callback failed - propagate error
2703                        yield Err(e);
2704                        return;
2705                    }
2706                }
2707            }
2708        };
2709
2710        Ok(Box::pin(s))
2711    }
2712}