1use crate::config::{RealtimeConfig, ToolDefinition, VadConfig, VadMode};
56use crate::events::{ServerEvent, ToolResponse};
57use adk_core::{
58 AdkError, AfterAgentCallback, AfterToolCallback, Agent, BeforeAgentCallback,
59 BeforeToolCallback, CallbackContext, Content, Event, EventActions, EventStream,
60 GlobalInstructionProvider, InstructionProvider, InvocationContext, MemoryEntry, Part,
61 ReadonlyContext, Result, Tool, ToolCallbackContext, ToolContext, Toolset,
62};
63use async_stream::stream;
64use async_trait::async_trait;
65
66use std::sync::{Arc, Mutex};
67
68pub type BoxedRealtimeModel = Arc<dyn crate::model::RealtimeModel>;
70
71pub struct RealtimeAgent {
77 name: String,
78 description: String,
79 model: BoxedRealtimeModel,
80
81 instruction: Option<String>,
83 instruction_provider: Option<Arc<InstructionProvider>>,
84 global_instruction: Option<String>,
85 global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
86
87 voice: Option<String>,
89 vad_config: Option<VadConfig>,
90 modalities: Vec<String>,
91
92 tools: Vec<Arc<dyn Tool>>,
94 toolsets: Vec<Arc<dyn Toolset>>,
95 sub_agents: Vec<Arc<dyn Agent>>,
96
97 before_callbacks: Arc<Vec<BeforeAgentCallback>>,
99 after_callbacks: Arc<Vec<AfterAgentCallback>>,
100 before_tool_callbacks: Arc<Vec<BeforeToolCallback>>,
101 after_tool_callbacks: Arc<Vec<AfterToolCallback>>,
102
103 on_audio: Option<AudioCallback>,
105 on_transcript: Option<TranscriptCallback>,
106 on_speech_started: Option<SpeechCallback>,
107 on_speech_stopped: Option<SpeechCallback>,
108
109 #[cfg(feature = "video-avatar")]
111 avatar_config: Option<crate::avatar::AvatarConfig>,
112
113 #[cfg(feature = "video-avatar")]
115 avatar_provider: Option<std::sync::Arc<dyn crate::avatar::AvatarProvider>>,
116}
117
118pub type AudioCallback = Arc<
120 dyn Fn(&[u8], &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
121 + Send
122 + Sync,
123>;
124
125pub type TranscriptCallback = Arc<
127 dyn Fn(&str, &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
128 + Send
129 + Sync,
130>;
131
132pub type SpeechCallback = Arc<
134 dyn Fn(u64) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync,
135>;
136
137impl std::fmt::Debug for RealtimeAgent {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("RealtimeAgent")
140 .field("name", &self.name)
141 .field("description", &self.description)
142 .field("model", &self.model.model_id())
143 .field("voice", &self.voice)
144 .field("tools_count", &self.tools.len())
145 .field("toolsets_count", &self.toolsets.len())
146 .field("sub_agents_count", &self.sub_agents.len())
147 .finish()
148 }
149}
150
151pub struct RealtimeAgentBuilder {
153 name: String,
154 description: Option<String>,
155 model: Option<BoxedRealtimeModel>,
156 instruction: Option<String>,
157 instruction_provider: Option<Arc<InstructionProvider>>,
158 global_instruction: Option<String>,
159 global_instruction_provider: Option<Arc<GlobalInstructionProvider>>,
160 voice: Option<String>,
161 vad_config: Option<VadConfig>,
162 modalities: Vec<String>,
163 tools: Vec<Arc<dyn Tool>>,
164 toolsets: Vec<Arc<dyn Toolset>>,
165 sub_agents: Vec<Arc<dyn Agent>>,
166 before_callbacks: Vec<BeforeAgentCallback>,
167 after_callbacks: Vec<AfterAgentCallback>,
168 before_tool_callbacks: Vec<BeforeToolCallback>,
169 after_tool_callbacks: Vec<AfterToolCallback>,
170 on_audio: Option<AudioCallback>,
171 on_transcript: Option<TranscriptCallback>,
172 on_speech_started: Option<SpeechCallback>,
173 on_speech_stopped: Option<SpeechCallback>,
174
175 #[cfg(feature = "video-avatar")]
176 avatar_config: Option<crate::avatar::AvatarConfig>,
177
178 #[cfg(feature = "video-avatar")]
179 avatar_provider: Option<std::sync::Arc<dyn crate::avatar::AvatarProvider>>,
180}
181
182impl RealtimeAgentBuilder {
183 pub fn new(name: impl Into<String>) -> Self {
185 Self {
186 name: name.into(),
187 description: None,
188 model: None,
189 instruction: None,
190 instruction_provider: None,
191 global_instruction: None,
192 global_instruction_provider: None,
193 voice: None,
194 vad_config: None,
195 modalities: vec!["text".to_string(), "audio".to_string()],
196 tools: Vec::new(),
197 toolsets: Vec::new(),
198 sub_agents: Vec::new(),
199 before_callbacks: Vec::new(),
200 after_callbacks: Vec::new(),
201 before_tool_callbacks: Vec::new(),
202 after_tool_callbacks: Vec::new(),
203 on_audio: None,
204 on_transcript: None,
205 on_speech_started: None,
206 on_speech_stopped: None,
207 #[cfg(feature = "video-avatar")]
208 avatar_config: None,
209 #[cfg(feature = "video-avatar")]
210 avatar_provider: None,
211 }
212 }
213
214 pub fn description(mut self, desc: impl Into<String>) -> Self {
216 self.description = Some(desc.into());
217 self
218 }
219
220 pub fn model(mut self, model: BoxedRealtimeModel) -> Self {
222 self.model = Some(model);
223 self
224 }
225
226 pub fn instruction(mut self, instruction: impl Into<String>) -> Self {
228 self.instruction = Some(instruction.into());
229 self
230 }
231
232 pub fn instruction_provider(mut self, provider: InstructionProvider) -> Self {
234 self.instruction_provider = Some(Arc::new(provider));
235 self
236 }
237
238 pub fn global_instruction(mut self, instruction: impl Into<String>) -> Self {
240 self.global_instruction = Some(instruction.into());
241 self
242 }
243
244 pub fn global_instruction_provider(mut self, provider: GlobalInstructionProvider) -> Self {
246 self.global_instruction_provider = Some(Arc::new(provider));
247 self
248 }
249
250 pub fn voice(mut self, voice: impl Into<String>) -> Self {
252 self.voice = Some(voice.into());
253 self
254 }
255
256 pub fn vad(mut self, config: VadConfig) -> Self {
258 self.vad_config = Some(config);
259 self
260 }
261
262 pub fn server_vad(mut self) -> Self {
264 self.vad_config = Some(VadConfig {
265 mode: VadMode::ServerVad,
266 threshold: Some(0.5),
267 prefix_padding_ms: Some(300),
268 silence_duration_ms: Some(500),
269 interrupt_response: Some(true),
270 eagerness: None,
271 });
272 self
273 }
274
275 pub fn modalities(mut self, modalities: Vec<String>) -> Self {
277 self.modalities = modalities;
278 self
279 }
280
281 pub fn tool(mut self, tool: Arc<dyn Tool>) -> Self {
283 self.tools.push(tool);
284 self
285 }
286
287 pub fn toolset(mut self, toolset: Arc<dyn Toolset>) -> Self {
293 self.toolsets.push(toolset);
294 self
295 }
296
297 pub fn sub_agent(mut self, agent: Arc<dyn Agent>) -> Self {
299 self.sub_agents.push(agent);
300 self
301 }
302
303 pub fn before_agent_callback(mut self, callback: BeforeAgentCallback) -> Self {
305 self.before_callbacks.push(callback);
306 self
307 }
308
309 pub fn after_agent_callback(mut self, callback: AfterAgentCallback) -> Self {
311 self.after_callbacks.push(callback);
312 self
313 }
314
315 pub fn before_tool_callback(mut self, callback: BeforeToolCallback) -> Self {
317 self.before_tool_callbacks.push(callback);
318 self
319 }
320
321 pub fn after_tool_callback(mut self, callback: AfterToolCallback) -> Self {
323 self.after_tool_callbacks.push(callback);
324 self
325 }
326
327 pub fn on_audio(mut self, callback: AudioCallback) -> Self {
329 self.on_audio = Some(callback);
330 self
331 }
332
333 pub fn on_transcript(mut self, callback: TranscriptCallback) -> Self {
335 self.on_transcript = Some(callback);
336 self
337 }
338
339 pub fn on_speech_started(mut self, callback: SpeechCallback) -> Self {
341 self.on_speech_started = Some(callback);
342 self
343 }
344
345 pub fn on_speech_stopped(mut self, callback: SpeechCallback) -> Self {
347 self.on_speech_stopped = Some(callback);
348 self
349 }
350
351 #[cfg(feature = "video-avatar")]
359 pub fn avatar(mut self, config: crate::avatar::AvatarConfig) -> Self {
360 self.avatar_config = Some(config);
361 self
362 }
363
364 #[cfg(feature = "video-avatar")]
385 pub fn avatar_provider(
386 mut self,
387 provider: std::sync::Arc<dyn crate::avatar::AvatarProvider>,
388 ) -> Self {
389 self.avatar_provider = Some(provider);
390 self
391 }
392
393 pub fn build(self) -> Result<RealtimeAgent> {
395 let model =
396 self.model.ok_or_else(|| AdkError::agent("RealtimeModel is required".to_string()))?;
397
398 Ok(RealtimeAgent {
399 name: self.name,
400 description: self.description.unwrap_or_default(),
401 model,
402 instruction: self.instruction,
403 instruction_provider: self.instruction_provider,
404 global_instruction: self.global_instruction,
405 global_instruction_provider: self.global_instruction_provider,
406 voice: self.voice,
407 vad_config: self.vad_config,
408 modalities: self.modalities,
409 tools: self.tools,
410 toolsets: self.toolsets,
411 sub_agents: self.sub_agents,
412 before_callbacks: Arc::new(self.before_callbacks),
413 after_callbacks: Arc::new(self.after_callbacks),
414 before_tool_callbacks: Arc::new(self.before_tool_callbacks),
415 after_tool_callbacks: Arc::new(self.after_tool_callbacks),
416 on_audio: self.on_audio,
417 on_transcript: self.on_transcript,
418 on_speech_started: self.on_speech_started,
419 on_speech_stopped: self.on_speech_stopped,
420 #[cfg(feature = "video-avatar")]
421 avatar_config: self.avatar_config,
422 #[cfg(feature = "video-avatar")]
423 avatar_provider: self.avatar_provider,
424 })
425 }
426}
427
428impl RealtimeAgent {
429 pub fn builder(name: impl Into<String>) -> RealtimeAgentBuilder {
431 RealtimeAgentBuilder::new(name)
432 }
433
434 pub fn instruction(&self) -> Option<&String> {
436 self.instruction.as_ref()
437 }
438
439 pub fn voice(&self) -> Option<&String> {
441 self.voice.as_ref()
442 }
443
444 pub fn vad_config(&self) -> Option<&VadConfig> {
446 self.vad_config.as_ref()
447 }
448
449 pub fn tools(&self) -> &[Arc<dyn Tool>] {
451 &self.tools
452 }
453
454 #[cfg(feature = "video-avatar")]
458 pub fn avatar_config(&self) -> Option<&crate::avatar::AvatarConfig> {
459 self.avatar_config.as_ref()
460 }
461
462 #[cfg(feature = "video-avatar")]
466 pub fn avatar_provider(&self) -> Option<&std::sync::Arc<dyn crate::avatar::AvatarProvider>> {
467 self.avatar_provider.as_ref()
468 }
469
470 async fn build_config(
472 &self,
473 ctx: &Arc<dyn InvocationContext>,
474 resolved_tools: &[Arc<dyn Tool>],
475 ) -> Result<RealtimeConfig> {
476 let mut config = RealtimeConfig::default();
477
478 if let Some(provider) = &self.global_instruction_provider {
480 let global_inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
481 if !global_inst.is_empty() {
482 config.instruction = Some(global_inst);
483 }
484 } else if let Some(ref template) = self.global_instruction {
485 let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
486 config.instruction = Some(processed);
487 }
488
489 if let Some(provider) = &self.instruction_provider {
491 let inst = provider(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
492 if !inst.is_empty() {
493 if let Some(existing) = &mut config.instruction {
494 existing.push_str("\n\n");
495 existing.push_str(&inst);
496 } else {
497 config.instruction = Some(inst);
498 }
499 }
500 } else if let Some(ref template) = self.instruction {
501 let processed = adk_core::inject_session_state(ctx.as_ref(), template).await?;
502 if let Some(existing) = &mut config.instruction {
503 existing.push_str("\n\n");
504 existing.push_str(&processed);
505 } else {
506 config.instruction = Some(processed);
507 }
508 }
509
510 config.voice = self.voice.clone();
512 config.turn_detection = self.vad_config.clone();
513 config.modalities = Some(self.modalities.clone());
514
515 let tool_defs: Vec<ToolDefinition> = resolved_tools
517 .iter()
518 .map(|t| ToolDefinition {
519 name: t.name().to_string(),
520 description: Some(t.enhanced_description().to_string()),
521 parameters: t.parameters_schema(),
522 })
523 .collect();
524
525 if !tool_defs.is_empty() {
526 config.tools = Some(tool_defs);
527 }
528
529 if !self.sub_agents.is_empty() {
531 let mut tools = config.tools.unwrap_or_default();
532 tools.push(ToolDefinition {
533 name: "transfer_to_agent".to_string(),
534 description: Some("Transfer execution to another agent.".to_string()),
535 parameters: Some(serde_json::json!({
536 "type": "object",
537 "properties": {
538 "agent_name": {
539 "type": "string",
540 "description": "The name of the agent to transfer to."
541 }
542 },
543 "required": ["agent_name"]
544 })),
545 });
546 config.tools = Some(tools);
547 }
548
549 #[cfg(feature = "video-avatar")]
554 if let Some(ref avatar) = self.avatar_config {
555 tracing::warn!(
556 agent = %self.name,
557 source_url = %avatar.source_url,
558 "video avatar configured but the current realtime provider does not support video avatars; proceeding audio-only"
559 );
560 let avatar_json = serde_json::to_value(avatar).unwrap_or_else(|e| {
561 tracing::warn!("failed to serialize avatar config: {e}");
562 serde_json::Value::Null
563 });
564 let extra = config.extra.get_or_insert_with(|| serde_json::json!({}));
565 if let Some(obj) = extra.as_object_mut() {
566 obj.insert("avatarConfig".to_string(), avatar_json);
567 }
568 }
569
570 Ok(config)
571 }
572
573 #[allow(dead_code)]
575 async fn execute_tool(
576 &self,
577 ctx: &Arc<dyn InvocationContext>,
578 call_id: &str,
579 name: &str,
580 arguments: &str,
581 ) -> (serde_json::Value, EventActions) {
582 let tool = self.tools.iter().find(|t| t.name() == name);
584
585 if let Some(tool) = tool {
586 let args: serde_json::Value =
587 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
588
589 let tool_ctx: Arc<dyn ToolContext> =
591 Arc::new(RealtimeToolContext::new(ctx.clone(), call_id.to_string()));
592
593 let tool_cb_ctx =
595 Arc::new(ToolCallbackContext::new(ctx.clone(), name.to_string(), args.clone()));
596 for callback in self.before_tool_callbacks.as_ref() {
597 if let Err(e) = callback(tool_cb_ctx.clone() as Arc<dyn CallbackContext>).await {
598 return (
599 serde_json::json!({ "error": e.to_string() }),
600 EventActions::default(),
601 );
602 }
603 }
604
605 let result = match tool.execute(tool_ctx.clone(), args.clone()).await {
607 Ok(result) => result,
608 Err(e) => serde_json::json!({ "error": e.to_string() }),
609 };
610
611 let actions = tool_ctx.actions();
612
613 let tool_cb_ctx =
615 Arc::new(ToolCallbackContext::new(ctx.clone(), name.to_string(), args.clone()));
616 for callback in self.after_tool_callbacks.as_ref() {
617 if let Err(e) = callback(tool_cb_ctx.clone() as Arc<dyn CallbackContext>).await {
618 return (serde_json::json!({ "error": e.to_string() }), actions);
619 }
620 }
621
622 (result, actions)
623 } else {
624 (
625 serde_json::json!({ "error": format!("Tool {} not found", name) }),
626 EventActions::default(),
627 )
628 }
629 }
630}
631
632#[async_trait]
633impl Agent for RealtimeAgent {
634 fn name(&self) -> &str {
635 &self.name
636 }
637
638 fn description(&self) -> &str {
639 &self.description
640 }
641
642 fn sub_agents(&self) -> &[Arc<dyn Agent>] {
643 &self.sub_agents
644 }
645
646 async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
647 let agent_name = self.name.clone();
648 let invocation_id = ctx.invocation_id().to_string();
649 let model = self.model.clone();
650 let _sub_agents = self.sub_agents.clone();
651
652 let before_callbacks = self.before_callbacks.clone();
654 let after_callbacks = self.after_callbacks.clone();
655 let before_tool_callbacks = self.before_tool_callbacks.clone();
656 let after_tool_callbacks = self.after_tool_callbacks.clone();
657 let tools = self.tools.clone();
658 let toolsets = self.toolsets.clone();
659
660 let on_audio = self.on_audio.clone();
662 let on_transcript = self.on_transcript.clone();
663 let on_speech_started = self.on_speech_started.clone();
664 let on_speech_stopped = self.on_speech_stopped.clone();
665
666 #[cfg(feature = "video-avatar")]
668 let avatar_provider = self.avatar_provider.clone();
669 #[cfg(feature = "video-avatar")]
670 let avatar_config_for_session = self.avatar_config.clone();
671
672 let mut resolved_tools: Vec<Arc<dyn Tool>> = tools.clone();
674 let static_tool_names: std::collections::HashSet<String> =
675 tools.iter().map(|t| t.name().to_string()).collect();
676 let mut toolset_source: std::collections::HashMap<String, String> =
677 std::collections::HashMap::new();
678
679 for toolset in &toolsets {
680 let toolset_tools = toolset.tools(ctx.clone() as Arc<dyn ReadonlyContext>).await?;
681 for tool in &toolset_tools {
682 let name = tool.name().to_string();
683 if static_tool_names.contains(&name) {
684 return Err(AdkError::agent(format!(
685 "Duplicate tool name '{}': conflict between static tool and toolset '{}'",
686 name,
687 toolset.name()
688 )));
689 }
690 if let Some(other_toolset_name) = toolset_source.get(&name) {
691 return Err(AdkError::agent(format!(
692 "Duplicate tool name '{}': conflict between toolset '{}' and toolset '{}'",
693 name,
694 other_toolset_name,
695 toolset.name()
696 )));
697 }
698 toolset_source.insert(name, toolset.name().to_string());
699 resolved_tools.push(tool.clone());
700 }
701 }
702
703 let config = self.build_config(&ctx, &resolved_tools).await?;
705
706 let s = stream! {
707 for callback in before_callbacks.as_ref() {
709 match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
710 Ok(Some(content)) => {
711 let mut early_event = Event::new(&invocation_id);
712 early_event.author = agent_name.clone();
713 early_event.llm_response.content = Some(content);
714 yield Ok(early_event);
715 return;
716 }
717 Ok(None) => continue,
718 Err(e) => {
719 yield Err(e);
720 return;
721 }
722 }
723 }
724
725 let session = match model.connect(config).await {
727 Ok(s) => s,
728 Err(e) => {
729 yield Err(AdkError::model(format!("Failed to connect: {}", e)));
730 return;
731 }
732 };
733
734 let mut start_event = Event::new(&invocation_id);
736 start_event.author = agent_name.clone();
737 start_event.llm_response.content = Some(Content {
738 role: "system".to_string(),
739 parts: vec![Part::Text {
740 text: format!("Realtime session started: {}", session.session_id()),
741 }],
742 });
743 yield Ok(start_event);
744
745 #[cfg(feature = "video-avatar")]
747 let avatar_session_id: Option<String> = {
748 if let (Some(provider), Some(config)) = (&avatar_provider, &avatar_config_for_session) {
749 match provider.start_session(config).await {
750 Ok(session_info) => {
751 tracing::info!(
752 provider = %session_info.provider,
753 session_id = %session_info.session_id,
754 "avatar session started"
755 );
756 let mut avatar_event = Event::new(&invocation_id);
758 avatar_event.author = agent_name.clone();
759 avatar_event.llm_response.content = Some(Content {
760 role: "system".to_string(),
761 parts: vec![Part::Text {
762 text: serde_json::to_string(&session_info).unwrap_or_default(),
763 }],
764 });
765 yield Ok(avatar_event);
766 Some(session_info.session_id)
767 }
768 Err(e) => {
769 tracing::warn!(
771 error = %e,
772 "avatar session creation failed, falling back to audio-only"
773 );
774 None
775 }
776 }
777 } else {
778 None
779 }
780 };
781 #[cfg(not(feature = "video-avatar"))]
782 let _avatar_session_id: Option<String> = None;
783
784 #[cfg(feature = "video-avatar")]
786 let _avatar_keep_alive_handle: Option<tokio::task::JoinHandle<()>> = {
787 if let (Some(provider), Some(sess_id)) = (&avatar_provider, &avatar_session_id) {
788 Some(crate::avatar::spawn_keep_alive(
789 provider.clone(),
790 sess_id.clone(),
791 std::time::Duration::from_secs(30),
792 ))
793 } else {
794 None
795 }
796 };
797
798 let user_content = ctx.user_content();
801 for part in &user_content.parts {
802 if let Part::Text { text } = part {
803 if let Err(e) = session.send_text(text).await {
804 yield Err(AdkError::model(format!("Failed to send text: {}", e)));
805 return;
806 }
807 if let Err(e) = session.create_response().await {
809 yield Err(AdkError::model(format!("Failed to create response: {}", e)));
810 return;
811 }
812 }
813 }
814
815 loop {
817 let event = session.next_event().await;
818
819 match event {
820 Some(Ok(server_event)) => {
821 match server_event {
822 ServerEvent::AudioDelta { delta, item_id, .. } => {
823 #[cfg(feature = "video-avatar")]
825 if let (Some(provider), Some(sess_id)) = (&avatar_provider, &avatar_session_id) {
826 if let Err(e) = provider.send_audio(sess_id, &delta).await {
827 tracing::warn!(error = %e, "avatar send_audio failed");
828 }
829 if let Some(ref cb) = on_audio {
832 cb(&delta, &item_id).await;
833 }
834 continue;
835 }
836
837 if let Some(ref cb) = on_audio {
839 cb(&delta, &item_id).await;
840 }
841
842 let mut audio_event = Event::new(&invocation_id);
844 audio_event.author = agent_name.clone();
845 audio_event.llm_response.content = Some(Content {
846 role: "model".to_string(),
847 parts: vec![Part::InlineData {
848 mime_type: "audio/pcm".to_string(),
849 data: delta,
850 }],
851 });
852 yield Ok(audio_event);
853 }
854
855 ServerEvent::TextDelta { delta, .. } => {
856 let mut text_event = Event::new(&invocation_id);
857 text_event.author = agent_name.clone();
858 text_event.llm_response.content = Some(Content {
859 role: "model".to_string(),
860 parts: vec![Part::Text { text: delta.clone() }],
861 });
862 yield Ok(text_event);
863 }
864
865 ServerEvent::TranscriptDelta { delta, item_id, .. } => {
866 if let Some(ref cb) = on_transcript {
867 cb(&delta, &item_id).await;
868 }
869 }
870
871 ServerEvent::SpeechStarted { audio_start_ms, .. } => {
872 if let Some(ref cb) = on_speech_started {
873 cb(audio_start_ms).await;
874 }
875 }
876
877 ServerEvent::SpeechStopped { audio_end_ms, .. } => {
878 if let Some(ref cb) = on_speech_stopped {
879 cb(audio_end_ms).await;
880 }
881 }
882
883 ServerEvent::FunctionCallDone {
884 call_id,
885 name,
886 arguments,
887 ..
888 } => {
889 if name == "transfer_to_agent" {
891 let args: serde_json::Value = serde_json::from_str(&arguments)
892 .unwrap_or(serde_json::json!({}));
893 let target = args.get("agent_name")
894 .and_then(|v| v.as_str())
895 .unwrap_or_default()
896 .to_string();
897
898 let mut transfer_event = Event::new(&invocation_id);
899 transfer_event.author = agent_name.clone();
900 transfer_event.actions.transfer_to_agent = Some(target);
901 yield Ok(transfer_event);
902
903 let _ = session.close().await;
904 return;
905 }
906
907 let tool = resolved_tools.iter().find(|t| t.name() == name);
909
910 let (result, actions) = if let Some(tool) = tool {
911 let args: serde_json::Value = serde_json::from_str(&arguments)
912 .unwrap_or(serde_json::json!({}));
913
914 let tool_ctx: Arc<dyn ToolContext> = Arc::new(
915 RealtimeToolContext::new(ctx.clone(), call_id.clone())
916 );
917
918 let tool_cb_ctx = Arc::new(ToolCallbackContext::new(
920 ctx.clone(),
921 name.clone(),
922 args.clone(),
923 ));
924 for callback in before_tool_callbacks.as_ref() {
925 if let Err(e) = callback(tool_cb_ctx.clone() as Arc<dyn CallbackContext>).await {
926 let error_result = serde_json::json!({ "error": e.to_string() });
927 (error_result, EventActions::default())
928 } else {
929 continue;
930 };
931 }
932
933 let result = match tool.execute(tool_ctx.clone(), args.clone()).await {
934 Ok(r) => r,
935 Err(e) => serde_json::json!({ "error": e.to_string() }),
936 };
937
938 let actions = tool_ctx.actions();
939
940 let tool_cb_ctx = Arc::new(ToolCallbackContext::new(
942 ctx.clone(),
943 name.clone(),
944 args.clone(),
945 ));
946 for callback in after_tool_callbacks.as_ref() {
947 let _ = callback(tool_cb_ctx.clone() as Arc<dyn CallbackContext>).await;
948 }
949
950 (result, actions)
951 } else {
952 (
953 serde_json::json!({ "error": format!("Tool {} not found", name) }),
954 EventActions::default(),
955 )
956 };
957
958 let mut tool_event = Event::new(&invocation_id);
960 tool_event.author = agent_name.clone();
961 tool_event.actions = actions.clone();
962 tool_event.llm_response.content = Some(Content {
963 role: "function".to_string(),
964 parts: vec![Part::FunctionResponse {
965 function_response: adk_core::FunctionResponseData::new(name.clone(), result.clone()),
966 id: Some(call_id.clone()),
967 }],
968 });
969 yield Ok(tool_event);
970
971 if actions.escalate || actions.skip_summarization {
973 let _ = session.close().await;
974 return;
975 }
976
977 let response = ToolResponse {
979 call_id,
980 output: result,
981 };
982 if let Err(e) = session.send_tool_response(response).await {
983 yield Err(AdkError::model(format!("Failed to send tool response: {}", e)));
984 let _ = session.close().await;
985 return;
986 }
987 }
988
989 ServerEvent::ResponseDone { .. } => {
990 }
992
993 ServerEvent::Error { error, .. } => {
994 yield Err(AdkError::model(format!(
995 "Realtime error: {} - {}",
996 error.code.unwrap_or_default(),
997 error.message
998 )));
999 }
1000
1001
1002 _ => {
1003 }
1005 }
1006 }
1007 Some(Err(e)) => {
1008 yield Err(AdkError::model(format!("Session error: {}", e)));
1009 break;
1010 }
1011 None => {
1012 break;
1014 }
1015 }
1016 }
1017
1018 #[cfg(feature = "video-avatar")]
1020 {
1021 if let Some(handle) = _avatar_keep_alive_handle {
1023 handle.abort();
1024 }
1025 if let (Some(provider), Some(sess_id)) = (&avatar_provider, &avatar_session_id) {
1027 if let Err(e) = provider.stop_session(sess_id).await {
1028 tracing::warn!(error = %e, "avatar session cleanup failed");
1029 }
1030 }
1031 }
1032
1033 for callback in after_callbacks.as_ref() {
1035 match callback(ctx.clone() as Arc<dyn CallbackContext>).await {
1036 Ok(Some(content)) => {
1037 let mut after_event = Event::new(&invocation_id);
1038 after_event.author = agent_name.clone();
1039 after_event.llm_response.content = Some(content);
1040 yield Ok(after_event);
1041 break;
1042 }
1043 Ok(None) => continue,
1044 Err(e) => {
1045 yield Err(e);
1046 return;
1047 }
1048 }
1049 }
1050 };
1051
1052 Ok(Box::pin(s))
1053 }
1054}
1055
1056struct RealtimeToolContext {
1058 parent_ctx: Arc<dyn InvocationContext>,
1059 function_call_id: String,
1060 actions: Mutex<EventActions>,
1061}
1062
1063impl RealtimeToolContext {
1064 fn new(parent_ctx: Arc<dyn InvocationContext>, function_call_id: String) -> Self {
1065 Self { parent_ctx, function_call_id, actions: Mutex::new(EventActions::default()) }
1066 }
1067}
1068
1069#[async_trait]
1070impl ReadonlyContext for RealtimeToolContext {
1071 fn invocation_id(&self) -> &str {
1072 self.parent_ctx.invocation_id()
1073 }
1074
1075 fn agent_name(&self) -> &str {
1076 self.parent_ctx.agent_name()
1077 }
1078
1079 fn user_id(&self) -> &str {
1080 self.parent_ctx.user_id()
1081 }
1082
1083 fn app_name(&self) -> &str {
1084 self.parent_ctx.app_name()
1085 }
1086
1087 fn session_id(&self) -> &str {
1088 self.parent_ctx.session_id()
1089 }
1090
1091 fn branch(&self) -> &str {
1092 self.parent_ctx.branch()
1093 }
1094
1095 fn user_content(&self) -> &Content {
1096 self.parent_ctx.user_content()
1097 }
1098}
1099
1100#[async_trait]
1101impl CallbackContext for RealtimeToolContext {
1102 fn artifacts(&self) -> Option<Arc<dyn adk_core::Artifacts>> {
1103 self.parent_ctx.artifacts()
1104 }
1105}
1106
1107#[async_trait]
1108impl ToolContext for RealtimeToolContext {
1109 fn function_call_id(&self) -> &str {
1110 &self.function_call_id
1111 }
1112
1113 fn actions(&self) -> EventActions {
1114 self.actions.lock().unwrap().clone()
1115 }
1116
1117 fn set_actions(&self, actions: EventActions) {
1118 *self.actions.lock().unwrap() = actions;
1119 }
1120
1121 async fn search_memory(&self, query: &str) -> Result<Vec<MemoryEntry>> {
1122 if let Some(memory) = self.parent_ctx.memory() {
1123 memory.search(query).await
1124 } else {
1125 Ok(vec![])
1126 }
1127 }
1128}