Skip to main content

adk_realtime/
agent.rs

1//! RealtimeAgent - an Agent implementation for real-time voice interactions.
2//!
3//! This module provides `RealtimeAgent`, which implements the `adk_core::Agent` trait
4//! and provides the same callback/tool/instruction features as `LlmAgent`, but uses
5//! real-time bidirectional audio streaming instead of text-based LLM calls.
6//!
7//! # Architecture
8//!
9//! ```text
10//!                     ┌─────────────────────────────────────────┐
11//!                     │              Agent Trait                │
12//!                     │  (name, description, run, sub_agents)   │
13//!                     └────────────────┬────────────────────────┘
14//!                                      │
15//!              ┌───────────────────────┼───────────────────────┐
16//!              │                       │                       │
17//!     ┌────────▼────────┐    ┌─────────▼─────────┐   ┌─────────▼─────────┐
18//!     │    LlmAgent     │    │  RealtimeAgent    │   │  SequentialAgent  │
19//!     │  (text-based)   │    │  (voice-based)    │   │   (workflow)      │
20//!     └─────────────────┘    └───────────────────┘   └───────────────────┘
21//! ```
22//!
23//! # Shared Features with LlmAgent
24//!
25//! - **Tools**: Function tools that can be called during conversation
26//! - **Callbacks**: before_agent, after_agent, before_tool, after_tool
27//! - **Instructions**: Static or dynamic instruction providers
28//! - **Sub-agents**: Agent handoff/transfer support
29//! - **Context**: Full access to InvocationContext (session, memory, artifacts)
30//!
31//! # Example
32//!
33//! ```rust,ignore
34//! use adk_realtime::RealtimeAgent;
35//! use adk_realtime::openai::OpenAIRealtimeModel;
36//!
37//! let model = OpenAIRealtimeModel::new(api_key, "gpt-4o-realtime-preview-2024-12-17");
38//!
39//! let agent = RealtimeAgent::builder("voice_assistant")
40//!     .model(Box::new(model))
41//!     .instruction("You are a helpful voice assistant.")
42//!     .voice("alloy")
43//!     .tool(Arc::new(weather_tool))
44//!     .before_agent_callback(|ctx| async move {
45//!         println!("Starting voice session for user: {}", ctx.user_id());
46//!         Ok(None)
47//!     })
48//!     .build()?;
49//!
50//! // Run through standard ADK runner
51//! let runner = Runner::new(agent);
52//! runner.run(session, user_content).await?;
53//! ```
54
55use crate::config::{RealtimeConfig, ToolDefinition, VadConfig, VadMode};
56use crate::events::{ServerEvent, ToolResponse};
57use adk_core::{
58    AdkError, AfterAgentCallback, AfterToolCallback, Agent, BeforeAgentCallback,
59    BeforeToolCallback, CallbackContext, Content, Event, EventActions, EventStream,
60    GlobalInstructionProvider, InstructionProvider, InvocationContext, MemoryEntry, Part,
61    ReadonlyContext, Result, Tool, ToolCallbackContext, ToolContext, Toolset,
62};
63use async_stream::stream;
64use async_trait::async_trait;
65
66use std::sync::{Arc, Mutex};
67
68/// Shared realtime model type (thread-safe for async usage).
69pub type BoxedRealtimeModel = Arc<dyn crate::model::RealtimeModel>;
70
71/// A real-time voice agent that implements the ADK Agent trait.
72///
73/// `RealtimeAgent` provides bidirectional audio streaming while maintaining
74/// compatibility with the standard ADK agent ecosystem. It supports the same
75/// callbacks, tools, and instruction patterns as `LlmAgent`.
76pub struct RealtimeAgent {
77    name: String,
78    description: String,
79    model: BoxedRealtimeModel,
80
81    // Instructions (same as LlmAgent)
82    instruction: Option<String>,
83    instruction_provider: Option<Arc<InstructionProvider>>,
84    global_instruction: Option<String>,
85    global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
86
87    // Voice-specific settings
88    voice: Option<String>,
89    vad_config: Option<VadConfig>,
90    modalities: Vec<String>,
91
92    // Tools (same as LlmAgent)
93    tools: Vec<Arc<dyn Tool>>,
94    toolsets: Vec<Arc<dyn Toolset>>,
95    sub_agents: Vec<Arc<dyn Agent>>,
96
97    // Callbacks (same as LlmAgent)
98    before_callbacks: Arc<Vec<BeforeAgentCallback>>,
99    after_callbacks: Arc<Vec<AfterAgentCallback>>,
100    before_tool_callbacks: Arc<Vec<BeforeToolCallback>>,
101    after_tool_callbacks: Arc<Vec<AfterToolCallback>>,
102
103    // Realtime-specific callbacks
104    on_audio: Option<AudioCallback>,
105    on_transcript: Option<TranscriptCallback>,
106    on_speech_started: Option<SpeechCallback>,
107    on_speech_stopped: Option<SpeechCallback>,
108
109    // Video avatar configuration
110    #[cfg(feature = "video-avatar")]
111    avatar_config: Option<crate::avatar::AvatarConfig>,
112
113    // Video avatar provider instance
114    #[cfg(feature = "video-avatar")]
115    avatar_provider: Option<std::sync::Arc<dyn crate::avatar::AvatarProvider>>,
116}
117
118/// Callback for audio output events (receives raw PCM bytes).
119pub type AudioCallback = Arc<
120    dyn Fn(&[u8], &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
121        + Send
122        + Sync,
123>;
124
125/// Callback for transcript events.
126pub type TranscriptCallback = Arc<
127    dyn Fn(&str, &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
128        + Send
129        + Sync,
130>;
131
132/// Callback for speech detection events.
133pub type SpeechCallback = Arc<
134    dyn Fn(u64) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync,
135>;
136
137impl std::fmt::Debug for RealtimeAgent {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("RealtimeAgent")
140            .field("name", &self.name)
141            .field("description", &self.description)
142            .field("model", &self.model.model_id())
143            .field("voice", &self.voice)
144            .field("tools_count", &self.tools.len())
145            .field("toolsets_count", &self.toolsets.len())
146            .field("sub_agents_count", &self.sub_agents.len())
147            .finish()
148    }
149}
150
151/// Builder for RealtimeAgent.
152pub struct RealtimeAgentBuilder {
153    name: String,
154    description: Option<String>,
155    model: Option<BoxedRealtimeModel>,
156    instruction: Option<String>,
157    instruction_provider: Option<Arc<InstructionProvider>>,
158    global_instruction: Option<String>,
159    global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
160    voice: Option<String>,
161    vad_config: Option<VadConfig>,
162    modalities: Vec<String>,
163    tools: Vec<Arc<dyn Tool>>,
164    toolsets: Vec<Arc<dyn Toolset>>,
165    sub_agents: Vec<Arc<dyn Agent>>,
166    before_callbacks: Vec<BeforeAgentCallback>,
167    after_callbacks: Vec<AfterAgentCallback>,
168    before_tool_callbacks: Vec<BeforeToolCallback>,
169    after_tool_callbacks: Vec<AfterToolCallback>,
170    on_audio: Option<AudioCallback>,
171    on_transcript: Option<TranscriptCallback>,
172    on_speech_started: Option<SpeechCallback>,
173    on_speech_stopped: Option<SpeechCallback>,
174
175    #[cfg(feature = "video-avatar")]
176    avatar_config: Option<crate::avatar::AvatarConfig>,
177
178    #[cfg(feature = "video-avatar")]
179    avatar_provider: Option<std::sync::Arc<dyn crate::avatar::AvatarProvider>>,
180}
181
182impl RealtimeAgentBuilder {
183    /// Create a new builder with the given agent name.
184    pub fn new(name: impl Into<String>) -> Self {
185        Self {
186            name: name.into(),
187            description: None,
188            model: None,
189            instruction: None,
190            instruction_provider: None,
191            global_instruction: None,
192            global_instruction_provider: None,
193            voice: None,
194            vad_config: None,
195            modalities: vec!["text".to_string(), "audio".to_string()],
196            tools: Vec::new(),
197            toolsets: Vec::new(),
198            sub_agents: Vec::new(),
199            before_callbacks: Vec::new(),
200            after_callbacks: Vec::new(),
201            before_tool_callbacks: Vec::new(),
202            after_tool_callbacks: Vec::new(),
203            on_audio: None,
204            on_transcript: None,
205            on_speech_started: None,
206            on_speech_stopped: None,
207            #[cfg(feature = "video-avatar")]
208            avatar_config: None,
209            #[cfg(feature = "video-avatar")]
210            avatar_provider: None,
211        }
212    }
213
214    /// Set the agent description.
215    pub fn description(mut self, desc: impl Into<String>) -> Self {
216        self.description = Some(desc.into());
217        self
218    }
219
220    /// Set the realtime model.
221    pub fn model(mut self, model: BoxedRealtimeModel) -> Self {
222        self.model = Some(model);
223        self
224    }
225
226    /// Set a static instruction.
227    pub fn instruction(mut self, instruction: impl Into<String>) -> Self {
228        self.instruction = Some(instruction.into());
229        self
230    }
231
232    /// Set a dynamic instruction provider.
233    pub fn instruction_provider(mut self, provider: InstructionProvider) -> Self {
234        self.instruction_provider = Some(Arc::new(provider));
235        self
236    }
237
238    /// Set a static global instruction.
239    pub fn global_instruction(mut self, instruction: impl Into<String>) -> Self {
240        self.global_instruction = Some(instruction.into());
241        self
242    }
243
244    /// Set a dynamic global instruction provider.
245    pub fn global_instruction_provider(mut self, provider: GlobalInstructionProvider) -> Self {
246        self.global_instruction_provider = Some(Arc::new(provider));
247        self
248    }
249
250    /// Set the voice for audio output.
251    pub fn voice(mut self, voice: impl Into<String>) -> Self {
252        self.voice = Some(voice.into());
253        self
254    }
255
256    /// Set voice activity detection configuration.
257    pub fn vad(mut self, config: VadConfig) -> Self {
258        self.vad_config = Some(config);
259        self
260    }
261
262    /// Enable server-side VAD with default settings.
263    pub fn server_vad(mut self) -> Self {
264        self.vad_config = Some(VadConfig {
265            mode: VadMode::ServerVad,
266            threshold: Some(0.5),
267            prefix_padding_ms: Some(300),
268            silence_duration_ms: Some(500),
269            interrupt_response: Some(true),
270            eagerness: None,
271        });
272        self
273    }
274
275    /// Set output modalities (e.g., ["text", "audio"]).
276    pub fn modalities(mut self, modalities: Vec<String>) -> Self {
277        self.modalities = modalities;
278        self
279    }
280
281    /// Add a tool.
282    pub fn tool(mut self, tool: Arc<dyn Tool>) -> Self {
283        self.tools.push(tool);
284        self
285    }
286
287    /// Register a dynamic toolset for per-invocation tool resolution.
288    ///
289    /// Toolsets are resolved at the start of each `run()` call using the
290    /// invocation's `ReadonlyContext`. This enables context-dependent tools
291    /// like per-user browser sessions from a pool.
292    pub fn toolset(mut self, toolset: Arc<dyn Toolset>) -> Self {
293        self.toolsets.push(toolset);
294        self
295    }
296
297    /// Add a sub-agent for handoffs.
298    pub fn sub_agent(mut self, agent: Arc<dyn Agent>) -> Self {
299        self.sub_agents.push(agent);
300        self
301    }
302
303    /// Add a before-agent callback.
304    pub fn before_agent_callback(mut self, callback: BeforeAgentCallback) -> Self {
305        self.before_callbacks.push(callback);
306        self
307    }
308
309    /// Add an after-agent callback.
310    pub fn after_agent_callback(mut self, callback: AfterAgentCallback) -> Self {
311        self.after_callbacks.push(callback);
312        self
313    }
314
315    /// Add a before-tool callback.
316    pub fn before_tool_callback(mut self, callback: BeforeToolCallback) -> Self {
317        self.before_tool_callbacks.push(callback);
318        self
319    }
320
321    /// Add an after-tool callback.
322    pub fn after_tool_callback(mut self, callback: AfterToolCallback) -> Self {
323        self.after_tool_callbacks.push(callback);
324        self
325    }
326
327    /// Set callback for audio output events.
328    pub fn on_audio(mut self, callback: AudioCallback) -> Self {
329        self.on_audio = Some(callback);
330        self
331    }
332
333    /// Set callback for transcript events.
334    pub fn on_transcript(mut self, callback: TranscriptCallback) -> Self {
335        self.on_transcript = Some(callback);
336        self
337    }
338
339    /// Set callback for speech started events.
340    pub fn on_speech_started(mut self, callback: SpeechCallback) -> Self {
341        self.on_speech_started = Some(callback);
342        self
343    }
344
345    /// Set callback for speech stopped events.
346    pub fn on_speech_stopped(mut self, callback: SpeechCallback) -> Self {
347        self.on_speech_stopped = Some(callback);
348        self
349    }
350
351    /// Set the video avatar configuration for this agent.
352    ///
353    /// When set, the avatar configuration is included in the session setup
354    /// payload sent to the realtime provider. If the provider does not support
355    /// video avatars, a warning is logged and the session proceeds audio-only.
356    ///
357    /// Requires the `video-avatar` feature flag.
358    #[cfg(feature = "video-avatar")]
359    pub fn avatar(mut self, config: crate::avatar::AvatarConfig) -> Self {
360        self.avatar_config = Some(config);
361        self
362    }
363
364    /// Set the video avatar provider for this agent.
365    ///
366    /// When both an `AvatarConfig` (with a provider kind) and an `AvatarProvider`
367    /// instance are set, the runner routes audio through the avatar provider
368    /// for lip-sync rendering instead of sending raw audio to the client.
369    ///
370    /// Requires the `video-avatar` feature flag.
371    ///
372    /// # Example
373    ///
374    /// ```rust,ignore
375    /// use std::sync::Arc;
376    /// use adk_realtime::avatar::heygen::{HeyGenConfig, HeyGenProvider};
377    ///
378    /// let provider = Arc::new(HeyGenProvider::new(HeyGenConfig::new("key")));
379    /// let agent = RealtimeAgentBuilder::new("assistant")
380    ///     .avatar(avatar_config)
381    ///     .avatar_provider(provider)
382    ///     .build()?;
383    /// ```
384    #[cfg(feature = "video-avatar")]
385    pub fn avatar_provider(
386        mut self,
387        provider: std::sync::Arc<dyn crate::avatar::AvatarProvider>,
388    ) -> Self {
389        self.avatar_provider = Some(provider);
390        self
391    }
392
393    /// Build the RealtimeAgent.
394    pub fn build(self) -> Result<RealtimeAgent> {
395        let model =
396            self.model.ok_or_else(|| AdkError::agent("RealtimeModel is required".to_string()))?;
397
398        Ok(RealtimeAgent {
399            name: self.name,
400            description: self.description.unwrap_or_default(),
401            model,
402            instruction: self.instruction,
403            instruction_provider: self.instruction_provider,
404            global_instruction: self.global_instruction,
405            global_instruction_provider: self.global_instruction_provider,
406            voice: self.voice,
407            vad_config: self.vad_config,
408            modalities: self.modalities,
409            tools: self.tools,
410            toolsets: self.toolsets,
411            sub_agents: self.sub_agents,
412            before_callbacks: Arc::new(self.before_callbacks),
413            after_callbacks: Arc::new(self.after_callbacks),
414            before_tool_callbacks: Arc::new(self.before_tool_callbacks),
415            after_tool_callbacks: Arc::new(self.after_tool_callbacks),
416            on_audio: self.on_audio,
417            on_transcript: self.on_transcript,
418            on_speech_started: self.on_speech_started,
419            on_speech_stopped: self.on_speech_stopped,
420            #[cfg(feature = "video-avatar")]
421            avatar_config: self.avatar_config,
422            #[cfg(feature = "video-avatar")]
423            avatar_provider: self.avatar_provider,
424        })
425    }
426}
427
428impl RealtimeAgent {
429    /// Create a new builder.
430    pub fn builder(name: impl Into<String>) -> RealtimeAgentBuilder {
431        RealtimeAgentBuilder::new(name)
432    }
433
434    /// Get the static instruction, if set.
435    pub fn instruction(&self) -> Option<&String> {
436        self.instruction.as_ref()
437    }
438
439    /// Get the voice setting, if set.
440    pub fn voice(&self) -> Option<&String> {
441        self.voice.as_ref()
442    }
443
444    /// Get the VAD configuration, if set.
445    pub fn vad_config(&self) -> Option<&VadConfig> {
446        self.vad_config.as_ref()
447    }
448
449    /// Get the list of tools.
450    pub fn tools(&self) -> &[Arc<dyn Tool>] {
451        &self.tools
452    }
453
454    /// Get the avatar configuration, if set.
455    ///
456    /// Requires the `video-avatar` feature flag.
457    #[cfg(feature = "video-avatar")]
458    pub fn avatar_config(&self) -> Option<&crate::avatar::AvatarConfig> {
459        self.avatar_config.as_ref()
460    }
461
462    /// Get the avatar provider, if set.
463    ///
464    /// Requires the `video-avatar` feature flag.
465    #[cfg(feature = "video-avatar")]
466    pub fn avatar_provider(&self) -> Option<&std::sync::Arc<dyn crate::avatar::AvatarProvider>> {
467        self.avatar_provider.as_ref()
468    }
469
470    /// Build the realtime configuration from agent settings.
471    async fn build_config(
472        &self,
473        ctx: &Arc<dyn InvocationContext>,
474        resolved_tools: &[Arc<dyn Tool>],
475    ) -> Result<RealtimeConfig> {
476        let mut config = RealtimeConfig::default();
477
478        // Build instruction from providers or static value
479        if let Some(provider) = &self.global_instruction_provider {
480            let global_inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
481            if !global_inst.is_empty() {
482                config.instruction = Some(global_inst);
483            }
484        } else if let Some(ref template) = self.global_instruction {
485            let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
486            config.instruction = Some(processed);
487        }
488
489        // Add agent-specific instruction
490        if let Some(provider) = &self.instruction_provider {
491            let inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
492            if !inst.is_empty() {
493                if let Some(existing) = &mut config.instruction {
494                    existing.push_str("\n\n");
495                    existing.push_str(&inst);
496                } else {
497                    config.instruction = Some(inst);
498                }
499            }
500        } else if let Some(ref template) = self.instruction {
501            let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
502            if let Some(existing) = &mut config.instruction {
503                existing.push_str("\n\n");
504                existing.push_str(&processed);
505            } else {
506                config.instruction = Some(processed);
507            }
508        }
509
510        // Voice settings
511        config.voice = self.voice.clone();
512        config.turn_detection = self.vad_config.clone();
513        config.modalities = Some(self.modalities.clone());
514
515        // Convert ADK tools to realtime tool definitions
516        let tool_defs: Vec<ToolDefinition> = resolved_tools
517            .iter()
518            .map(|t| ToolDefinition {
519                name: t.name().to_string(),
520                description: Some(t.enhanced_description().to_string()),
521                parameters: t.parameters_schema(),
522            })
523            .collect();
524
525        if !tool_defs.is_empty() {
526            config.tools = Some(tool_defs);
527        }
528
529        // Add transfer_to_agent tool if sub-agents exist
530        if !self.sub_agents.is_empty() {
531            let mut tools = config.tools.unwrap_or_default();
532            tools.push(ToolDefinition {
533                name: "transfer_to_agent".to_string(),
534                description: Some("Transfer execution to another agent.".to_string()),
535                parameters: Some(serde_json::json!({
536                    "type": "object",
537                    "properties": {
538                        "agent_name": {
539                            "type": "string",
540                            "description": "The name of the agent to transfer to."
541                        }
542                    },
543                    "required": ["agent_name"]
544                })),
545            });
546            config.tools = Some(tools);
547        }
548
549        // Include avatar configuration in session setup if present.
550        // Currently no realtime provider supports video avatars natively,
551        // so we log a warning and proceed audio-only. The config is still
552        // placed in `extra` so future provider implementations can read it.
553        #[cfg(feature = "video-avatar")]
554        if let Some(ref avatar) = self.avatar_config {
555            tracing::warn!(
556                agent = %self.name,
557                source_url = %avatar.source_url,
558                "video avatar configured but the current realtime provider does not support video avatars; proceeding audio-only"
559            );
560            let avatar_json = serde_json::to_value(avatar).unwrap_or_else(|e| {
561                tracing::warn!("failed to serialize avatar config: {e}");
562                serde_json::Value::Null
563            });
564            let extra = config.extra.get_or_insert_with(|| serde_json::json!({}));
565            if let Some(obj) = extra.as_object_mut() {
566                obj.insert("avatarConfig".to_string(), avatar_json);
567            }
568        }
569
570        Ok(config)
571    }
572
573    /// Execute a tool call.
574    #[allow(dead_code)]
575    async fn execute_tool(
576        &self,
577        ctx: &Arc<dyn InvocationContext>,
578        call_id: &str,
579        name: &str,
580        arguments: &str,
581    ) -> (serde_json::Value, EventActions) {
582        // Find the tool
583        let tool = self.tools.iter().find(|t| t.name() == name);
584
585        if let Some(tool) = tool {
586            let args: serde_json::Value =
587                serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
588
589            // Create tool context
590            let tool_ctx: Arc<dyn ToolContext> =
591                Arc::new(RealtimeToolContext::new(ctx.clone(), call_id.to_string()));
592
593            // Execute before_tool callbacks
594            let tool_cb_ctx =
595                Arc::new(ToolCallbackContext::new(ctx.clone(), name.to_string(), args.clone()));
596            for callback in self.before_tool_callbacks.as_ref() {
597                if let Err(e) = callback(tool_cb_ctx.clone() as Arc<dyn CallbackContext>).await {
598                    return (
599                        serde_json::json!({ "error": e.to_string() }),
600                        EventActions::default(),
601                    );
602                }
603            }
604
605            // Execute the tool
606            let result = match tool.execute(tool_ctx.clone(), args.clone()).await {
607                Ok(result) => result,
608                Err(e) => serde_json::json!({ "error": e.to_string() }),
609            };
610
611            let actions = tool_ctx.actions();
612
613            // Execute after_tool callbacks
614            let tool_cb_ctx =
615                Arc::new(ToolCallbackContext::new(ctx.clone(), name.to_string(), args.clone()));
616            for callback in self.after_tool_callbacks.as_ref() {
617                if let Err(e) = callback(tool_cb_ctx.clone() as Arc<dyn CallbackContext>).await {
618                    return (serde_json::json!({ "error": e.to_string() }), actions);
619                }
620            }
621
622            (result, actions)
623        } else {
624            (
625                serde_json::json!({ "error": format!("Tool {} not found", name) }),
626                EventActions::default(),
627            )
628        }
629    }
630}
631
632#[async_trait]
633impl Agent for RealtimeAgent {
634    fn name(&self) -> &str {
635        &self.name
636    }
637
638    fn description(&self) -> &str {
639        &self.description
640    }
641
642    fn sub_agents(&self) -> &[Arc<dyn Agent>] {
643        &self.sub_agents
644    }
645
646    async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
647        let agent_name = self.name.clone();
648        let invocation_id = ctx.invocation_id().to_string();
649        let model = self.model.clone();
650        let _sub_agents = self.sub_agents.clone();
651
652        // Clone callback refs
653        let before_callbacks = self.before_callbacks.clone();
654        let after_callbacks = self.after_callbacks.clone();
655        let before_tool_callbacks = self.before_tool_callbacks.clone();
656        let after_tool_callbacks = self.after_tool_callbacks.clone();
657        let tools = self.tools.clone();
658        let toolsets = self.toolsets.clone();
659
660        // Clone realtime callbacks
661        let on_audio = self.on_audio.clone();
662        let on_transcript = self.on_transcript.clone();
663        let on_speech_started = self.on_speech_started.clone();
664        let on_speech_stopped = self.on_speech_stopped.clone();
665
666        // Clone avatar provider for the stream closure
667        #[cfg(feature = "video-avatar")]
668        let avatar_provider = self.avatar_provider.clone();
669        #[cfg(feature = "video-avatar")]
670        let avatar_config_for_session = self.avatar_config.clone();
671
672        // ===== RESOLVE TOOLSETS =====
673        let mut resolved_tools: Vec<Arc<dyn Tool>> = tools.clone();
674        let static_tool_names: std::collections::HashSet<String> =
675            tools.iter().map(|t| t.name().to_string()).collect();
676        let mut toolset_source: std::collections::HashMap<String, String> =
677            std::collections::HashMap::new();
678
679        for toolset in &toolsets {
680            let toolset_tools = toolset.tools(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
681            for tool in &toolset_tools {
682                let name = tool.name().to_string();
683                if static_tool_names.contains(&name) {
684                    return Err(AdkError::agent(format!(
685                        "Duplicate tool name '{}': conflict between static tool and toolset '{}'",
686                        name,
687                        toolset.name()
688                    )));
689                }
690                if let Some(other_toolset_name) = toolset_source.get(&name) {
691                    return Err(AdkError::agent(format!(
692                        "Duplicate tool name '{}': conflict between toolset '{}' and toolset '{}'",
693                        name,
694                        other_toolset_name,
695                        toolset.name()
696                    )));
697                }
698                toolset_source.insert(name, toolset.name().to_string());
699                resolved_tools.push(tool.clone());
700            }
701        }
702
703        // Build config with resolved tools
704        let config = self.build_config(&ctx, &resolved_tools).await?;
705
706        let s = stream! {
707            // ===== BEFORE AGENT CALLBACKS =====
708            for callback in before_callbacks.as_ref() {
709                match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
710                    Ok(Some(content)) => {
711                        let mut early_event = Event::new(&invocation_id);
712                        early_event.author = agent_name.clone();
713                        early_event.llm_response.content = Some(content);
714                        yield Ok(early_event);
715                        return;
716                    }
717                    Ok(None) => continue,
718                    Err(e) => {
719                        yield Err(e);
720                        return;
721                    }
722                }
723            }
724
725            // ===== CONNECT TO REALTIME SESSION =====
726            let session = match model.connect(config).await {
727                Ok(s) => s,
728                Err(e) => {
729                    yield Err(AdkError::model(format!("Failed to connect: {}", e)));
730                    return;
731                }
732            };
733
734            // Yield session started event
735            let mut start_event = Event::new(&invocation_id);
736            start_event.author = agent_name.clone();
737            start_event.llm_response.content = Some(Content {
738                role: "system".to_string(),
739                parts: vec![Part::Text {
740                    text: format!("Realtime session started: {}", session.session_id()),
741                }],
742            });
743            yield Ok(start_event);
744
745            // ===== START AVATAR SESSION (if configured) =====
746            #[cfg(feature = "video-avatar")]
747            let avatar_session_id: Option<String> = {
748                if let (Some(provider), Some(config)) = (&avatar_provider, &avatar_config_for_session) {
749                    match provider.start_session(config).await {
750                        Ok(session_info) => {
751                            tracing::info!(
752                                provider = %session_info.provider,
753                                session_id = %session_info.session_id,
754                                "avatar session started"
755                            );
756                            // Emit avatar session info as an event for the client
757                            let mut avatar_event = Event::new(&invocation_id);
758                            avatar_event.author = agent_name.clone();
759                            avatar_event.llm_response.content = Some(Content {
760                                role: "system".to_string(),
761                                parts: vec![Part::Text {
762                                    text: serde_json::to_string(&session_info).unwrap_or_default(),
763                                }],
764                            });
765                            yield Ok(avatar_event);
766                            Some(session_info.session_id)
767                        }
768                        Err(e) => {
769                            // Graceful degradation: log warning, continue audio-only
770                            tracing::warn!(
771                                error = %e,
772                                "avatar session creation failed, falling back to audio-only"
773                            );
774                            None
775                        }
776                    }
777                } else {
778                    None
779                }
780            };
781            #[cfg(not(feature = "video-avatar"))]
782            let _avatar_session_id: Option<String> = None;
783
784            // Spawn keep-alive task for avatar session
785            #[cfg(feature = "video-avatar")]
786            let _avatar_keep_alive_handle: Option<tokio::task::JoinHandle<()>> = {
787                if let (Some(provider), Some(sess_id)) = (&avatar_provider, &avatar_session_id) {
788                    Some(crate::avatar::spawn_keep_alive(
789                        provider.clone(),
790                        sess_id.clone(),
791                        std::time::Duration::from_secs(30),
792                    ))
793                } else {
794                    None
795                }
796            };
797
798            // ===== SEND INITIAL USER CONTENT =====
799            // If user provided text input, send it to start the conversation
800            let user_content = ctx.user_content();
801            for part in &user_content.parts {
802                if let Part::Text { text } = part {
803                    if let Err(e) = session.send_text(text).await {
804                        yield Err(AdkError::model(format!("Failed to send text: {}", e)));
805                        return;
806                    }
807                    // Request a response
808                    if let Err(e) = session.create_response().await {
809                        yield Err(AdkError::model(format!("Failed to create response: {}", e)));
810                        return;
811                    }
812                }
813            }
814
815            // ===== PROCESS REALTIME EVENTS =====
816            loop {
817                let event = session.next_event().await;
818
819                match event {
820                    Some(Ok(server_event)) => {
821                        match server_event {
822                            ServerEvent::AudioDelta { delta, item_id, .. } => {
823                                // Route audio through avatar provider if active
824                                #[cfg(feature = "video-avatar")]
825                                if let (Some(provider), Some(sess_id)) = (&avatar_provider, &avatar_session_id) {
826                                    if let Err(e) = provider.send_audio(sess_id, &delta).await {
827                                        tracing::warn!(error = %e, "avatar send_audio failed");
828                                    }
829                                    // Don't yield raw audio to client — avatar provides video+audio
830                                    // Still call the on_audio callback for monitoring
831                                    if let Some(ref cb) = on_audio {
832                                        cb(&delta, &item_id).await;
833                                    }
834                                    continue;
835                                }
836
837                                // No avatar provider — send raw audio to client
838                                if let Some(ref cb) = on_audio {
839                                    cb(&delta, &item_id).await;
840                                }
841
842                                // Yield audio event (delta is already raw bytes)
843                                let mut audio_event = Event::new(&invocation_id);
844                                audio_event.author = agent_name.clone();
845                                audio_event.llm_response.content = Some(Content {
846                                    role: "model".to_string(),
847                                    parts: vec![Part::InlineData {
848                                        mime_type: "audio/pcm".to_string(),
849                                        data: delta,
850                                    }],
851                                });
852                                yield Ok(audio_event);
853                            }
854
855                            ServerEvent::TextDelta { delta, .. } => {
856                                let mut text_event = Event::new(&invocation_id);
857                                text_event.author = agent_name.clone();
858                                text_event.llm_response.content = Some(Content {
859                                    role: "model".to_string(),
860                                    parts: vec![Part::Text { text: delta.clone() }],
861                                });
862                                yield Ok(text_event);
863                            }
864
865                            ServerEvent::TranscriptDelta { delta, item_id, .. } => {
866                                if let Some(ref cb) = on_transcript {
867                                    cb(&delta, &item_id).await;
868                                }
869                            }
870
871                            ServerEvent::SpeechStarted { audio_start_ms, .. } => {
872                                if let Some(ref cb) = on_speech_started {
873                                    cb(audio_start_ms).await;
874                                }
875                            }
876
877                            ServerEvent::SpeechStopped { audio_end_ms, .. } => {
878                                if let Some(ref cb) = on_speech_stopped {
879                                    cb(audio_end_ms).await;
880                                }
881                            }
882
883                            ServerEvent::FunctionCallDone {
884                                call_id,
885                                name,
886                                arguments,
887                                ..
888                            } => {
889                                // Handle transfer_to_agent
890                                if name == "transfer_to_agent" {
891                                    let args: serde_json::Value = serde_json::from_str(&arguments)
892                                        .unwrap_or(serde_json::json!({}));
893                                    let target = args.get("agent_name")
894                                        .and_then(|v| v.as_str())
895                                        .unwrap_or_default()
896                                        .to_string();
897
898                                    let mut transfer_event = Event::new(&invocation_id);
899                                    transfer_event.author = agent_name.clone();
900                                    transfer_event.actions.transfer_to_agent = Some(target);
901                                    yield Ok(transfer_event);
902
903                                    let _ = session.close().await;
904                                    return;
905                                }
906
907                                // Execute tool
908                                let tool = resolved_tools.iter().find(|t| t.name() == name);
909
910                                let (result, actions) = if let Some(tool) = tool {
911                                    let args: serde_json::Value = serde_json::from_str(&arguments)
912                                        .unwrap_or(serde_json::json!({}));
913
914                                    let tool_ctx: Arc<dyn ToolContext> = Arc::new(
915                                        RealtimeToolContext::new(ctx.clone(), call_id.clone())
916                                    );
917
918                                    // Execute before_tool callbacks
919                                    let tool_cb_ctx = Arc::new(ToolCallbackContext::new(
920                                        ctx.clone(),
921                                        name.clone(),
922                                        args.clone(),
923                                    ));
924                                    for callback in before_tool_callbacks.as_ref() {
925                                        if let Err(e) = callback(tool_cb_ctx.clone() as Arc<dyn CallbackContext>).await {
926                                            let error_result = serde_json::json!({ "error": e.to_string() });
927                                            (error_result, EventActions::default())
928                                        } else {
929                                            continue;
930                                        };
931                                    }
932
933                                    let result = match tool.execute(tool_ctx.clone(), args.clone()).await {
934                                        Ok(r) => r,
935                                        Err(e) => serde_json::json!({ "error": e.to_string() }),
936                                    };
937
938                                    let actions = tool_ctx.actions();
939
940                                    // Execute after_tool callbacks
941                                    let tool_cb_ctx = Arc::new(ToolCallbackContext::new(
942                                        ctx.clone(),
943                                        name.clone(),
944                                        args.clone(),
945                                    ));
946                                    for callback in after_tool_callbacks.as_ref() {
947                                        let _ = callback(tool_cb_ctx.clone() as Arc<dyn CallbackContext>).await;
948                                    }
949
950                                    (result, actions)
951                                } else {
952                                    (
953                                        serde_json::json!({ "error": format!("Tool {} not found", name) }),
954                                        EventActions::default(),
955                                    )
956                                };
957
958                                // Yield tool event
959                                let mut tool_event = Event::new(&invocation_id);
960                                tool_event.author = agent_name.clone();
961                                tool_event.actions = actions.clone();
962                                tool_event.llm_response.content = Some(Content {
963                                    role: "function".to_string(),
964                                    parts: vec![Part::FunctionResponse {
965                                        function_response: adk_core::FunctionResponseData::new(name.clone(), result.clone()),
966                                        id: Some(call_id.clone()),
967                                    }],
968                                });
969                                yield Ok(tool_event);
970
971                                // Check for escalation
972                                if actions.escalate || actions.skip_summarization {
973                                    let _ = session.close().await;
974                                    return;
975                                }
976
977                                // Send tool response back to session
978                                let response = ToolResponse {
979                                    call_id,
980                                    output: result,
981                                };
982                                if let Err(e) = session.send_tool_response(response).await {
983                                    yield Err(AdkError::model(format!("Failed to send tool response: {}", e)));
984                                    let _ = session.close().await;
985                                    return;
986                                }
987                            }
988
989                            ServerEvent::ResponseDone { .. } => {
990                                // Response complete, continue listening
991                            }
992
993                            ServerEvent::Error { error, .. } => {
994                                yield Err(AdkError::model(format!(
995                                    "Realtime error: {} - {}",
996                                    error.code.unwrap_or_default(),
997                                    error.message
998                                )));
999                            }
1000
1001
1002                            _ => {
1003                                // Ignore other events
1004                            }
1005                        }
1006                    }
1007                    Some(Err(e)) => {
1008                        yield Err(AdkError::model(format!("Session error: {}", e)));
1009                        break;
1010                    }
1011                    None => {
1012                        // Session closed
1013                        break;
1014                    }
1015                }
1016            }
1017
1018            // ===== STOP AVATAR SESSION (cleanup) =====
1019            #[cfg(feature = "video-avatar")]
1020            {
1021                // Abort keep-alive task
1022                if let Some(handle) = _avatar_keep_alive_handle {
1023                    handle.abort();
1024                }
1025                // Stop the avatar session
1026                if let (Some(provider), Some(sess_id)) = (&avatar_provider, &avatar_session_id) {
1027                    if let Err(e) = provider.stop_session(sess_id).await {
1028                        tracing::warn!(error = %e, "avatar session cleanup failed");
1029                    }
1030                }
1031            }
1032
1033            // ===== AFTER AGENT CALLBACKS =====
1034            for callback in after_callbacks.as_ref() {
1035                match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
1036                    Ok(Some(content)) => {
1037                        let mut after_event = Event::new(&invocation_id);
1038                        after_event.author = agent_name.clone();
1039                        after_event.llm_response.content = Some(content);
1040                        yield Ok(after_event);
1041                        break;
1042                    }
1043                    Ok(None) => continue,
1044                    Err(e) => {
1045                        yield Err(e);
1046                        return;
1047                    }
1048                }
1049            }
1050        };
1051
1052        Ok(Box::pin(s))
1053    }
1054}
1055
1056/// Tool context for realtime agent tool execution.
1057struct RealtimeToolContext {
1058    parent_ctx: Arc<dyn InvocationContext>,
1059    function_call_id: String,
1060    actions: Mutex<EventActions>,
1061}
1062
1063impl RealtimeToolContext {
1064    fn new(parent_ctx: Arc<dyn InvocationContext>, function_call_id: String) -> Self {
1065        Self { parent_ctx, function_call_id, actions: Mutex::new(EventActions::default()) }
1066    }
1067}
1068
1069#[async_trait]
1070impl ReadonlyContext for RealtimeToolContext {
1071    fn invocation_id(&self) -> &str {
1072        self.parent_ctx.invocation_id()
1073    }
1074
1075    fn agent_name(&self) -> &str {
1076        self.parent_ctx.agent_name()
1077    }
1078
1079    fn user_id(&self) -> &str {
1080        self.parent_ctx.user_id()
1081    }
1082
1083    fn app_name(&self) -> &str {
1084        self.parent_ctx.app_name()
1085    }
1086
1087    fn session_id(&self) -> &str {
1088        self.parent_ctx.session_id()
1089    }
1090
1091    fn branch(&self) -> &str {
1092        self.parent_ctx.branch()
1093    }
1094
1095    fn user_content(&self) -> &Content {
1096        self.parent_ctx.user_content()
1097    }
1098}
1099
1100#[async_trait]
1101impl CallbackContext for RealtimeToolContext {
1102    fn artifacts(&self) -> Option<Arc<dyn adk_core::Artifacts>> {
1103        self.parent_ctx.artifacts()
1104    }
1105}
1106
1107#[async_trait]
1108impl ToolContext for RealtimeToolContext {
1109    fn function_call_id(&self) -> &str {
1110        &self.function_call_id
1111    }
1112
1113    fn actions(&self) -> EventActions {
1114        self.actions.lock().unwrap().clone()
1115    }
1116
1117    fn set_actions(&self, actions: EventActions) {
1118        *self.actions.lock().unwrap() = actions;
1119    }
1120
1121    async fn search_memory(&self, query: &str) -> Result<Vec<MemoryEntry>> {
1122        if let Some(memory) = self.parent_ctx.memory() {
1123            memory.search(query).await
1124        } else {
1125            Ok(vec![])
1126        }
1127    }
1128}