Skip to main content

adk_realtime/
agent.rs

1//! RealtimeAgent - an Agent implementation for real-time voice interactions.
2//!
3//! This module provides `RealtimeAgent`, which implements the `adk_core::Agent` trait
4//! and provides the same callback/tool/instruction features as `LlmAgent`, but uses
5//! real-time bidirectional audio streaming instead of text-based LLM calls.
6//!
7//! # Architecture
8//!
9//! ```text
10//!                     ┌─────────────────────────────────────────┐
11//!                     │              Agent Trait                │
12//!                     │  (name, description, run, sub_agents)   │
13//!                     └────────────────┬────────────────────────┘
14//!                                      │
15//!              ┌───────────────────────┼───────────────────────┐
16//!              │                       │                       │
17//!     ┌────────▼────────┐    ┌─────────▼─────────┐   ┌─────────▼─────────┐
18//!     │    LlmAgent     │    │  RealtimeAgent    │   │  SequentialAgent  │
19//!     │  (text-based)   │    │  (voice-based)    │   │   (workflow)      │
20//!     └─────────────────┘    └───────────────────┘   └───────────────────┘
21//! ```
22//!
23//! # Shared Features with LlmAgent
24//!
25//! - **Tools**: Function tools that can be called during conversation
26//! - **Callbacks**: before_agent, after_agent, before_tool, after_tool
27//! - **Instructions**: Static or dynamic instruction providers
28//! - **Sub-agents**: Agent handoff/transfer support
29//! - **Context**: Full access to InvocationContext (session, memory, artifacts)
30//!
31//! # Example
32//!
33//! ```rust,ignore
34//! use adk_realtime::RealtimeAgent;
35//! use adk_realtime::openai::OpenAIRealtimeModel;
36//!
37//! let model = OpenAIRealtimeModel::new(api_key, "gpt-4o-realtime-preview-2024-12-17");
38//!
39//! let agent = RealtimeAgent::builder("voice_assistant")
40//!     .model(Box::new(model))
41//!     .instruction("You are a helpful voice assistant.")
42//!     .voice("alloy")
43//!     .tool(Arc::new(weather_tool))
44//!     .before_agent_callback(|ctx| async move {
45//!         println!("Starting voice session for user: {}", ctx.user_id());
46//!         Ok(None)
47//!     })
48//!     .build()?;
49//!
50//! // Run through standard ADK runner
51//! let runner = Runner::new(agent);
52//! runner.run(session, user_content).await?;
53//! ```
54
55use crate::config::{RealtimeConfig, ToolDefinition, VadConfig, VadMode};
56use crate::events::{ServerEvent, ToolResponse};
57use adk_core::{
58    AdkError, AfterAgentCallback, AfterToolCallback, Agent, BeforeAgentCallback,
59    BeforeToolCallback, CallbackContext, Content, Event, EventActions, EventStream,
60    GlobalInstructionProvider, InstructionProvider, InvocationContext, MemoryEntry, Part,
61    ReadonlyContext, Result, Tool, ToolContext, Toolset,
62};
63use async_stream::stream;
64use async_trait::async_trait;
65
66use std::sync::{Arc, Mutex};
67
68/// Shared realtime model type (thread-safe for async usage).
69pub type BoxedRealtimeModel = Arc<dyn crate::model::RealtimeModel>;
70
71/// A real-time voice agent that implements the ADK Agent trait.
72///
73/// `RealtimeAgent` provides bidirectional audio streaming while maintaining
74/// compatibility with the standard ADK agent ecosystem. It supports the same
75/// callbacks, tools, and instruction patterns as `LlmAgent`.
76pub struct RealtimeAgent {
77    name: String,
78    description: String,
79    model: BoxedRealtimeModel,
80
81    // Instructions (same as LlmAgent)
82    instruction: Option<String>,
83    instruction_provider: Option<Arc<InstructionProvider>>,
84    global_instruction: Option<String>,
85    global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
86
87    // Voice-specific settings
88    voice: Option<String>,
89    vad_config: Option<VadConfig>,
90    modalities: Vec<String>,
91
92    // Tools (same as LlmAgent)
93    tools: Vec<Arc<dyn Tool>>,
94    toolsets: Vec<Arc<dyn Toolset>>,
95    sub_agents: Vec<Arc<dyn Agent>>,
96
97    // Callbacks (same as LlmAgent)
98    before_callbacks: Arc<Vec<BeforeAgentCallback>>,
99    after_callbacks: Arc<Vec<AfterAgentCallback>>,
100    before_tool_callbacks: Arc<Vec<BeforeToolCallback>>,
101    after_tool_callbacks: Arc<Vec<AfterToolCallback>>,
102
103    // Realtime-specific callbacks
104    on_audio: Option<AudioCallback>,
105    on_transcript: Option<TranscriptCallback>,
106    on_speech_started: Option<SpeechCallback>,
107    on_speech_stopped: Option<SpeechCallback>,
108}
109
110/// Callback for audio output events (receives raw PCM bytes).
111pub type AudioCallback = Arc<
112    dyn Fn(&[u8], &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
113        + Send
114        + Sync,
115>;
116
117/// Callback for transcript events.
118pub type TranscriptCallback = Arc<
119    dyn Fn(&str, &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
120        + Send
121        + Sync,
122>;
123
124/// Callback for speech detection events.
125pub type SpeechCallback = Arc<
126    dyn Fn(u64) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync,
127>;
128
129impl std::fmt::Debug for RealtimeAgent {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct("RealtimeAgent")
132            .field("name", &self.name)
133            .field("description", &self.description)
134            .field("model", &self.model.model_id())
135            .field("voice", &self.voice)
136            .field("tools_count", &self.tools.len())
137            .field("toolsets_count", &self.toolsets.len())
138            .field("sub_agents_count", &self.sub_agents.len())
139            .finish()
140    }
141}
142
143/// Builder for RealtimeAgent.
144pub struct RealtimeAgentBuilder {
145    name: String,
146    description: Option<String>,
147    model: Option<BoxedRealtimeModel>,
148    instruction: Option<String>,
149    instruction_provider: Option<Arc<InstructionProvider>>,
150    global_instruction: Option<String>,
151    global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
152    voice: Option<String>,
153    vad_config: Option<VadConfig>,
154    modalities: Vec<String>,
155    tools: Vec<Arc<dyn Tool>>,
156    toolsets: Vec<Arc<dyn Toolset>>,
157    sub_agents: Vec<Arc<dyn Agent>>,
158    before_callbacks: Vec<BeforeAgentCallback>,
159    after_callbacks: Vec<AfterAgentCallback>,
160    before_tool_callbacks: Vec<BeforeToolCallback>,
161    after_tool_callbacks: Vec<AfterToolCallback>,
162    on_audio: Option<AudioCallback>,
163    on_transcript: Option<TranscriptCallback>,
164    on_speech_started: Option<SpeechCallback>,
165    on_speech_stopped: Option<SpeechCallback>,
166}
167
168impl RealtimeAgentBuilder {
169    /// Create a new builder with the given agent name.
170    pub fn new(name: impl Into<String>) -> Self {
171        Self {
172            name: name.into(),
173            description: None,
174            model: None,
175            instruction: None,
176            instruction_provider: None,
177            global_instruction: None,
178            global_instruction_provider: None,
179            voice: None,
180            vad_config: None,
181            modalities: vec!["text".to_string(), "audio".to_string()],
182            tools: Vec::new(),
183            toolsets: Vec::new(),
184            sub_agents: Vec::new(),
185            before_callbacks: Vec::new(),
186            after_callbacks: Vec::new(),
187            before_tool_callbacks: Vec::new(),
188            after_tool_callbacks: Vec::new(),
189            on_audio: None,
190            on_transcript: None,
191            on_speech_started: None,
192            on_speech_stopped: None,
193        }
194    }
195
196    /// Set the agent description.
197    pub fn description(mut self, desc: impl Into<String>) -> Self {
198        self.description = Some(desc.into());
199        self
200    }
201
202    /// Set the realtime model.
203    pub fn model(mut self, model: BoxedRealtimeModel) -> Self {
204        self.model = Some(model);
205        self
206    }
207
208    /// Set a static instruction.
209    pub fn instruction(mut self, instruction: impl Into<String>) -> Self {
210        self.instruction = Some(instruction.into());
211        self
212    }
213
214    /// Set a dynamic instruction provider.
215    pub fn instruction_provider(mut self, provider: InstructionProvider) -> Self {
216        self.instruction_provider = Some(Arc::new(provider));
217        self
218    }
219
220    /// Set a static global instruction.
221    pub fn global_instruction(mut self, instruction: impl Into<String>) -> Self {
222        self.global_instruction = Some(instruction.into());
223        self
224    }
225
226    /// Set a dynamic global instruction provider.
227    pub fn global_instruction_provider(mut self, provider: GlobalInstructionProvider) -> Self {
228        self.global_instruction_provider = Some(Arc::new(provider));
229        self
230    }
231
232    /// Set the voice for audio output.
233    pub fn voice(mut self, voice: impl Into<String>) -> Self {
234        self.voice = Some(voice.into());
235        self
236    }
237
238    /// Set voice activity detection configuration.
239    pub fn vad(mut self, config: VadConfig) -> Self {
240        self.vad_config = Some(config);
241        self
242    }
243
244    /// Enable server-side VAD with default settings.
245    pub fn server_vad(mut self) -> Self {
246        self.vad_config = Some(VadConfig {
247            mode: VadMode::ServerVad,
248            threshold: Some(0.5),
249            prefix_padding_ms: Some(300),
250            silence_duration_ms: Some(500),
251            interrupt_response: Some(true),
252            eagerness: None,
253        });
254        self
255    }
256
257    /// Set output modalities (e.g., ["text", "audio"]).
258    pub fn modalities(mut self, modalities: Vec<String>) -> Self {
259        self.modalities = modalities;
260        self
261    }
262
263    /// Add a tool.
264    pub fn tool(mut self, tool: Arc<dyn Tool>) -> Self {
265        self.tools.push(tool);
266        self
267    }
268
269    /// Register a dynamic toolset for per-invocation tool resolution.
270    ///
271    /// Toolsets are resolved at the start of each `run()` call using the
272    /// invocation's `ReadonlyContext`. This enables context-dependent tools
273    /// like per-user browser sessions from a pool.
274    pub fn toolset(mut self, toolset: Arc<dyn Toolset>) -> Self {
275        self.toolsets.push(toolset);
276        self
277    }
278
279    /// Add a sub-agent for handoffs.
280    pub fn sub_agent(mut self, agent: Arc<dyn Agent>) -> Self {
281        self.sub_agents.push(agent);
282        self
283    }
284
285    /// Add a before-agent callback.
286    pub fn before_agent_callback(mut self, callback: BeforeAgentCallback) -> Self {
287        self.before_callbacks.push(callback);
288        self
289    }
290
291    /// Add an after-agent callback.
292    pub fn after_agent_callback(mut self, callback: AfterAgentCallback) -> Self {
293        self.after_callbacks.push(callback);
294        self
295    }
296
297    /// Add a before-tool callback.
298    pub fn before_tool_callback(mut self, callback: BeforeToolCallback) -> Self {
299        self.before_tool_callbacks.push(callback);
300        self
301    }
302
303    /// Add an after-tool callback.
304    pub fn after_tool_callback(mut self, callback: AfterToolCallback) -> Self {
305        self.after_tool_callbacks.push(callback);
306        self
307    }
308
309    /// Set callback for audio output events.
310    pub fn on_audio(mut self, callback: AudioCallback) -> Self {
311        self.on_audio = Some(callback);
312        self
313    }
314
315    /// Set callback for transcript events.
316    pub fn on_transcript(mut self, callback: TranscriptCallback) -> Self {
317        self.on_transcript = Some(callback);
318        self
319    }
320
321    /// Set callback for speech started events.
322    pub fn on_speech_started(mut self, callback: SpeechCallback) -> Self {
323        self.on_speech_started = Some(callback);
324        self
325    }
326
327    /// Set callback for speech stopped events.
328    pub fn on_speech_stopped(mut self, callback: SpeechCallback) -> Self {
329        self.on_speech_stopped = Some(callback);
330        self
331    }
332
333    /// Build the RealtimeAgent.
334    pub fn build(self) -> Result<RealtimeAgent> {
335        let model =
336            self.model.ok_or_else(|| AdkError::agent("RealtimeModel is required".to_string()))?;
337
338        Ok(RealtimeAgent {
339            name: self.name,
340            description: self.description.unwrap_or_default(),
341            model,
342            instruction: self.instruction,
343            instruction_provider: self.instruction_provider,
344            global_instruction: self.global_instruction,
345            global_instruction_provider: self.global_instruction_provider,
346            voice: self.voice,
347            vad_config: self.vad_config,
348            modalities: self.modalities,
349            tools: self.tools,
350            toolsets: self.toolsets,
351            sub_agents: self.sub_agents,
352            before_callbacks: Arc::new(self.before_callbacks),
353            after_callbacks: Arc::new(self.after_callbacks),
354            before_tool_callbacks: Arc::new(self.before_tool_callbacks),
355            after_tool_callbacks: Arc::new(self.after_tool_callbacks),
356            on_audio: self.on_audio,
357            on_transcript: self.on_transcript,
358            on_speech_started: self.on_speech_started,
359            on_speech_stopped: self.on_speech_stopped,
360        })
361    }
362}
363
364impl RealtimeAgent {
365    /// Create a new builder.
366    pub fn builder(name: impl Into<String>) -> RealtimeAgentBuilder {
367        RealtimeAgentBuilder::new(name)
368    }
369
370    /// Get the static instruction, if set.
371    pub fn instruction(&self) -> Option<&String> {
372        self.instruction.as_ref()
373    }
374
375    /// Get the voice setting, if set.
376    pub fn voice(&self) -> Option<&String> {
377        self.voice.as_ref()
378    }
379
380    /// Get the VAD configuration, if set.
381    pub fn vad_config(&self) -> Option<&VadConfig> {
382        self.vad_config.as_ref()
383    }
384
385    /// Get the list of tools.
386    pub fn tools(&self) -> &[Arc<dyn Tool>] {
387        &self.tools
388    }
389
390    /// Build the realtime configuration from agent settings.
391    async fn build_config(
392        &self,
393        ctx: &Arc<dyn InvocationContext>,
394        resolved_tools: &[Arc<dyn Tool>],
395    ) -> Result<RealtimeConfig> {
396        let mut config = RealtimeConfig::default();
397
398        // Build instruction from providers or static value
399        if let Some(provider) = &self.global_instruction_provider {
400            let global_inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
401            if !global_inst.is_empty() {
402                config.instruction = Some(global_inst);
403            }
404        } else if let Some(ref template) = self.global_instruction {
405            let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
406            config.instruction = Some(processed);
407        }
408
409        // Add agent-specific instruction
410        if let Some(provider) = &self.instruction_provider {
411            let inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
412            if !inst.is_empty() {
413                if let Some(existing) = &mut config.instruction {
414                    existing.push_str("\n\n");
415                    existing.push_str(&inst);
416                } else {
417                    config.instruction = Some(inst);
418                }
419            }
420        } else if let Some(ref template) = self.instruction {
421            let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
422            if let Some(existing) = &mut config.instruction {
423                existing.push_str("\n\n");
424                existing.push_str(&processed);
425            } else {
426                config.instruction = Some(processed);
427            }
428        }
429
430        // Voice settings
431        config.voice = self.voice.clone();
432        config.turn_detection = self.vad_config.clone();
433        config.modalities = Some(self.modalities.clone());
434
435        // Convert ADK tools to realtime tool definitions
436        let tool_defs: Vec<ToolDefinition> = resolved_tools
437            .iter()
438            .map(|t| ToolDefinition {
439                name: t.name().to_string(),
440                description: Some(t.enhanced_description().to_string()),
441                parameters: t.parameters_schema(),
442            })
443            .collect();
444
445        if !tool_defs.is_empty() {
446            config.tools = Some(tool_defs);
447        }
448
449        // Add transfer_to_agent tool if sub-agents exist
450        if !self.sub_agents.is_empty() {
451            let mut tools = config.tools.unwrap_or_default();
452            tools.push(ToolDefinition {
453                name: "transfer_to_agent".to_string(),
454                description: Some("Transfer execution to another agent.".to_string()),
455                parameters: Some(serde_json::json!({
456                    "type": "object",
457                    "properties": {
458                        "agent_name": {
459                            "type": "string",
460                            "description": "The name of the agent to transfer to."
461                        }
462                    },
463                    "required": ["agent_name"]
464                })),
465            });
466            config.tools = Some(tools);
467        }
468
469        Ok(config)
470    }
471
472    /// Execute a tool call.
473    #[allow(dead_code)]
474    async fn execute_tool(
475        &self,
476        ctx: &Arc<dyn InvocationContext>,
477        call_id: &str,
478        name: &str,
479        arguments: &str,
480    ) -> (serde_json::Value, EventActions) {
481        // Find the tool
482        let tool = self.tools.iter().find(|t| t.name() == name);
483
484        if let Some(tool) = tool {
485            let args: serde_json::Value =
486                serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
487
488            // Create tool context
489            let tool_ctx: Arc<dyn ToolContext> =
490                Arc::new(RealtimeToolContext::new(ctx.clone(), call_id.to_string()));
491
492            // Execute before_tool callbacks
493            for callback in self.before_tool_callbacks.as_ref() {
494                if let Err(e) = callback(ctx.clone() as Arc<dyn CallbackContext>).await {
495                    return (
496                        serde_json::json!({ "error": e.to_string() }),
497                        EventActions::default(),
498                    );
499                }
500            }
501
502            // Execute the tool
503            let result = match tool.execute(tool_ctx.clone(), args).await {
504                Ok(result) => result,
505                Err(e) => serde_json::json!({ "error": e.to_string() }),
506            };
507
508            let actions = tool_ctx.actions();
509
510            // Execute after_tool callbacks
511            for callback in self.after_tool_callbacks.as_ref() {
512                if let Err(e) = callback(ctx.clone() as Arc<dyn CallbackContext>).await {
513                    return (serde_json::json!({ "error": e.to_string() }), actions);
514                }
515            }
516
517            (result, actions)
518        } else {
519            (
520                serde_json::json!({ "error": format!("Tool {} not found", name) }),
521                EventActions::default(),
522            )
523        }
524    }
525}
526
527#[async_trait]
528impl Agent for RealtimeAgent {
529    fn name(&self) -> &str {
530        &self.name
531    }
532
533    fn description(&self) -> &str {
534        &self.description
535    }
536
537    fn sub_agents(&self) -> &[Arc<dyn Agent>] {
538        &self.sub_agents
539    }
540
541    async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
542        let agent_name = self.name.clone();
543        let invocation_id = ctx.invocation_id().to_string();
544        let model = self.model.clone();
545        let _sub_agents = self.sub_agents.clone();
546
547        // Clone callback refs
548        let before_callbacks = self.before_callbacks.clone();
549        let after_callbacks = self.after_callbacks.clone();
550        let before_tool_callbacks = self.before_tool_callbacks.clone();
551        let after_tool_callbacks = self.after_tool_callbacks.clone();
552        let tools = self.tools.clone();
553        let toolsets = self.toolsets.clone();
554
555        // Clone realtime callbacks
556        let on_audio = self.on_audio.clone();
557        let on_transcript = self.on_transcript.clone();
558        let on_speech_started = self.on_speech_started.clone();
559        let on_speech_stopped = self.on_speech_stopped.clone();
560
561        // ===== RESOLVE TOOLSETS =====
562        let mut resolved_tools: Vec<Arc<dyn Tool>> = tools.clone();
563        let static_tool_names: std::collections::HashSet<String> =
564            tools.iter().map(|t| t.name().to_string()).collect();
565        let mut toolset_source: std::collections::HashMap<String, String> =
566            std::collections::HashMap::new();
567
568        for toolset in &toolsets {
569            let toolset_tools = toolset.tools(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
570            for tool in &toolset_tools {
571                let name = tool.name().to_string();
572                if static_tool_names.contains(&name) {
573                    return Err(AdkError::agent(format!(
574                        "Duplicate tool name '{}': conflict between static tool and toolset '{}'",
575                        name,
576                        toolset.name()
577                    )));
578                }
579                if let Some(other_toolset_name) = toolset_source.get(&name) {
580                    return Err(AdkError::agent(format!(
581                        "Duplicate tool name '{}': conflict between toolset '{}' and toolset '{}'",
582                        name,
583                        other_toolset_name,
584                        toolset.name()
585                    )));
586                }
587                toolset_source.insert(name, toolset.name().to_string());
588                resolved_tools.push(tool.clone());
589            }
590        }
591
592        // Build config with resolved tools
593        let config = self.build_config(&ctx, &resolved_tools).await?;
594
595        let s = stream! {
596            // ===== BEFORE AGENT CALLBACKS =====
597            for callback in before_callbacks.as_ref() {
598                match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
599                    Ok(Some(content)) => {
600                        let mut early_event = Event::new(&invocation_id);
601                        early_event.author = agent_name.clone();
602                        early_event.llm_response.content = Some(content);
603                        yield Ok(early_event);
604                        return;
605                    }
606                    Ok(None) => continue,
607                    Err(e) => {
608                        yield Err(e);
609                        return;
610                    }
611                }
612            }
613
614            // ===== CONNECT TO REALTIME SESSION =====
615            let session = match model.connect(config).await {
616                Ok(s) => s,
617                Err(e) => {
618                    yield Err(AdkError::model(format!("Failed to connect: {}", e)));
619                    return;
620                }
621            };
622
623            // Yield session started event
624            let mut start_event = Event::new(&invocation_id);
625            start_event.author = agent_name.clone();
626            start_event.llm_response.content = Some(Content {
627                role: "system".to_string(),
628                parts: vec![Part::Text {
629                    text: format!("Realtime session started: {}", session.session_id()),
630                }],
631            });
632            yield Ok(start_event);
633
634            // ===== SEND INITIAL USER CONTENT =====
635            // If user provided text input, send it to start the conversation
636            let user_content = ctx.user_content();
637            for part in &user_content.parts {
638                if let Part::Text { text } = part {
639                    if let Err(e) = session.send_text(text).await {
640                        yield Err(AdkError::model(format!("Failed to send text: {}", e)));
641                        return;
642                    }
643                    // Request a response
644                    if let Err(e) = session.create_response().await {
645                        yield Err(AdkError::model(format!("Failed to create response: {}", e)));
646                        return;
647                    }
648                }
649            }
650
651            // ===== PROCESS REALTIME EVENTS =====
652            loop {
653                let event = session.next_event().await;
654
655                match event {
656                    Some(Ok(server_event)) => {
657                        match server_event {
658                            ServerEvent::AudioDelta { delta, item_id, .. } => {
659                                // Call audio callback if set
660                                if let Some(ref cb) = on_audio {
661                                    cb(&delta, &item_id).await;
662                                }
663
664                                // Yield audio event (delta is already raw bytes)
665                                let mut audio_event = Event::new(&invocation_id);
666                                audio_event.author = agent_name.clone();
667                                audio_event.llm_response.content = Some(Content {
668                                    role: "model".to_string(),
669                                    parts: vec![Part::InlineData {
670                                        mime_type: "audio/pcm".to_string(),
671                                        data: delta,
672                                    }],
673                                });
674                                yield Ok(audio_event);
675                            }
676
677                            ServerEvent::TextDelta { delta, .. } => {
678                                let mut text_event = Event::new(&invocation_id);
679                                text_event.author = agent_name.clone();
680                                text_event.llm_response.content = Some(Content {
681                                    role: "model".to_string(),
682                                    parts: vec![Part::Text { text: delta.clone() }],
683                                });
684                                yield Ok(text_event);
685                            }
686
687                            ServerEvent::TranscriptDelta { delta, item_id, .. } => {
688                                if let Some(ref cb) = on_transcript {
689                                    cb(&delta, &item_id).await;
690                                }
691                            }
692
693                            ServerEvent::SpeechStarted { audio_start_ms, .. } => {
694                                if let Some(ref cb) = on_speech_started {
695                                    cb(audio_start_ms).await;
696                                }
697                            }
698
699                            ServerEvent::SpeechStopped { audio_end_ms, .. } => {
700                                if let Some(ref cb) = on_speech_stopped {
701                                    cb(audio_end_ms).await;
702                                }
703                            }
704
705                            ServerEvent::FunctionCallDone {
706                                call_id,
707                                name,
708                                arguments,
709                                ..
710                            } => {
711                                // Handle transfer_to_agent
712                                if name == "transfer_to_agent" {
713                                    let args: serde_json::Value = serde_json::from_str(&arguments)
714                                        .unwrap_or(serde_json::json!({}));
715                                    let target = args.get("agent_name")
716                                        .and_then(|v| v.as_str())
717                                        .unwrap_or_default()
718                                        .to_string();
719
720                                    let mut transfer_event = Event::new(&invocation_id);
721                                    transfer_event.author = agent_name.clone();
722                                    transfer_event.actions.transfer_to_agent = Some(target);
723                                    yield Ok(transfer_event);
724
725                                    let _ = session.close().await;
726                                    return;
727                                }
728
729                                // Execute tool
730                                let tool = resolved_tools.iter().find(|t| t.name() == name);
731
732                                let (result, actions) = if let Some(tool) = tool {
733                                    let args: serde_json::Value = serde_json::from_str(&arguments)
734                                        .unwrap_or(serde_json::json!({}));
735
736                                    let tool_ctx: Arc<dyn ToolContext> = Arc::new(
737                                        RealtimeToolContext::new(ctx.clone(), call_id.clone())
738                                    );
739
740                                    // Execute before_tool callbacks
741                                    for callback in before_tool_callbacks.as_ref() {
742                                        if let Err(e) = callback(ctx.clone() as Arc<dyn CallbackContext>).await {
743                                            let error_result = serde_json::json!({ "error": e.to_string() });
744                                            (error_result, EventActions::default())
745                                        } else {
746                                            continue;
747                                        };
748                                    }
749
750                                    let result = match tool.execute(tool_ctx.clone(), args).await {
751                                        Ok(r) => r,
752                                        Err(e) => serde_json::json!({ "error": e.to_string() }),
753                                    };
754
755                                    let actions = tool_ctx.actions();
756
757                                    // Execute after_tool callbacks
758                                    for callback in after_tool_callbacks.as_ref() {
759                                        let _ = callback(ctx.clone() as Arc<dyn CallbackContext>).await;
760                                    }
761
762                                    (result, actions)
763                                } else {
764                                    (
765                                        serde_json::json!({ "error": format!("Tool {} not found", name) }),
766                                        EventActions::default(),
767                                    )
768                                };
769
770                                // Yield tool event
771                                let mut tool_event = Event::new(&invocation_id);
772                                tool_event.author = agent_name.clone();
773                                tool_event.actions = actions.clone();
774                                tool_event.llm_response.content = Some(Content {
775                                    role: "function".to_string(),
776                                    parts: vec![Part::FunctionResponse {
777                                        function_response: adk_core::FunctionResponseData {
778                                            name: name.clone(),
779                                            response: result.clone(),
780                                        },
781                                        id: Some(call_id.clone()),
782                                    }],
783                                });
784                                yield Ok(tool_event);
785
786                                // Check for escalation
787                                if actions.escalate || actions.skip_summarization {
788                                    let _ = session.close().await;
789                                    return;
790                                }
791
792                                // Send tool response back to session
793                                let response = ToolResponse {
794                                    call_id,
795                                    output: result,
796                                };
797                                if let Err(e) = session.send_tool_response(response).await {
798                                    yield Err(AdkError::model(format!("Failed to send tool response: {}", e)));
799                                    let _ = session.close().await;
800                                    return;
801                                }
802                            }
803
804                            ServerEvent::ResponseDone { .. } => {
805                                // Response complete, continue listening
806                            }
807
808                            ServerEvent::Error { error, .. } => {
809                                yield Err(AdkError::model(format!(
810                                    "Realtime error: {} - {}",
811                                    error.code.unwrap_or_default(),
812                                    error.message
813                                )));
814                            }
815
816
817                            _ => {
818                                // Ignore other events
819                            }
820                        }
821                    }
822                    Some(Err(e)) => {
823                        yield Err(AdkError::model(format!("Session error: {}", e)));
824                        break;
825                    }
826                    None => {
827                        // Session closed
828                        break;
829                    }
830                }
831            }
832
833            // ===== AFTER AGENT CALLBACKS =====
834            for callback in after_callbacks.as_ref() {
835                match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
836                    Ok(Some(content)) => {
837                        let mut after_event = Event::new(&invocation_id);
838                        after_event.author = agent_name.clone();
839                        after_event.llm_response.content = Some(content);
840                        yield Ok(after_event);
841                        break;
842                    }
843                    Ok(None) => continue,
844                    Err(e) => {
845                        yield Err(e);
846                        return;
847                    }
848                }
849            }
850        };
851
852        Ok(Box::pin(s))
853    }
854}
855
856/// Tool context for realtime agent tool execution.
857struct RealtimeToolContext {
858    parent_ctx: Arc<dyn InvocationContext>,
859    function_call_id: String,
860    actions: Mutex<EventActions>,
861}
862
863impl RealtimeToolContext {
864    fn new(parent_ctx: Arc<dyn InvocationContext>, function_call_id: String) -> Self {
865        Self { parent_ctx, function_call_id, actions: Mutex::new(EventActions::default()) }
866    }
867}
868
869#[async_trait]
870impl ReadonlyContext for RealtimeToolContext {
871    fn invocation_id(&self) -> &str {
872        self.parent_ctx.invocation_id()
873    }
874
875    fn agent_name(&self) -> &str {
876        self.parent_ctx.agent_name()
877    }
878
879    fn user_id(&self) -> &str {
880        self.parent_ctx.user_id()
881    }
882
883    fn app_name(&self) -> &str {
884        self.parent_ctx.app_name()
885    }
886
887    fn session_id(&self) -> &str {
888        self.parent_ctx.session_id()
889    }
890
891    fn branch(&self) -> &str {
892        self.parent_ctx.branch()
893    }
894
895    fn user_content(&self) -> &Content {
896        self.parent_ctx.user_content()
897    }
898}
899
900#[async_trait]
901impl CallbackContext for RealtimeToolContext {
902    fn artifacts(&self) -> Option<Arc<dyn adk_core::Artifacts>> {
903        self.parent_ctx.artifacts()
904    }
905}
906
907#[async_trait]
908impl ToolContext for RealtimeToolContext {
909    fn function_call_id(&self) -> &str {
910        &self.function_call_id
911    }
912
913    fn actions(&self) -> EventActions {
914        self.actions.lock().unwrap().clone()
915    }
916
917    fn set_actions(&self, actions: EventActions) {
918        *self.actions.lock().unwrap() = actions;
919    }
920
921    async fn search_memory(&self, query: &str) -> Result<Vec<MemoryEntry>> {
922        if let Some(memory) = self.parent_ctx.memory() {
923            memory.search(query).await
924        } else {
925            Ok(vec![])
926        }
927    }
928}