adk_realtime/runner.rs
1//! RealtimeRunner for integrating realtime sessions with agents.
2//!
3//! This module provides the bridge between realtime audio sessions and
4//! the ADK agent framework, handling tool execution and event routing.
5
6use crate::config::{RealtimeConfig, SessionUpdateConfig, ToolDefinition};
7use crate::error::{RealtimeError, Result};
8use crate::events::{ServerEvent, ToolCall, ToolResponse};
9use crate::model::BoxedModel;
10use crate::session::ContextMutationOutcome;
11use async_trait::async_trait;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16/// Internal state machine tracking the resumability status of the RealtimeRunner.
17#[derive(Debug, Clone, Default, PartialEq)]
18pub enum RunnerState {
19 /// Runner is ready to accept transport resumption immediately.
20 #[default]
21 Idle,
22 /// Model is currently generating a response; tearing down the connection would corrupt context.
23 Generating,
24 /// A tool is currently executing; teardown would cause tool loss.
25 ExecutingTool,
26 /// A context mutation was queued while the runner was busy, and must be executed once Idle.
27 ///
28 /// **Provider Context:** This state is only utilized by providers that do *not* support
29 /// native mid-flight mutability (e.g., Gemini Live), requiring a physical transport teardown
30 /// and rebuild (Phantom Reconnect). Providers like OpenAI natively apply `session.update`
31 /// frames instantly and will never enter this queued state.
32 ///
33 /// **Queue Policy:** The runner keeps only one pending resumption. If a new session update
34 /// arrives while a resumption is already pending, the previous pending resumption is replaced.
35 /// This is intentional: pending session updates represent desired end state, not an ordered
36 /// command queue. The policy is last write wins.
37 PendingResumption {
38 /// The new configuration to apply on reconnection.
39 config: Box<crate::config::RealtimeConfig>,
40 /// An optional message to inject immediately after resumption.
41 bridge_message: Option<String>,
42 /// Number of failed reconnection attempts for this mutation.
43 attempts: u8,
44 },
45}
46
47/// Handler for tool/function calls from the realtime model.
48#[async_trait]
49pub trait ToolHandler: Send + Sync {
50 /// Execute a tool call and return the result.
51 async fn execute(&self, call: &ToolCall) -> Result<serde_json::Value>;
52}
53
54/// A simple function-based tool handler.
55pub struct FnToolHandler<F>
56where
57 F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
58{
59 handler: F,
60}
61
62impl<F> FnToolHandler<F>
63where
64 F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
65{
66 /// Create a new function-based tool handler.
67 pub fn new(handler: F) -> Self {
68 Self { handler }
69 }
70}
71
72#[async_trait]
73impl<F> ToolHandler for FnToolHandler<F>
74where
75 F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
76{
77 async fn execute(&self, call: &ToolCall) -> Result<serde_json::Value> {
78 (self.handler)(call)
79 }
80}
81
82/// Async function-based tool handler.
83#[allow(dead_code)]
84pub struct AsyncToolHandler<F, Fut>
85where
86 F: Fn(ToolCall) -> Fut + Send + Sync,
87 Fut: std::future::Future<Output = Result<serde_json::Value>> + Send,
88{
89 handler: F,
90}
91
92impl<F, Fut> AsyncToolHandler<F, Fut>
93where
94 F: Fn(ToolCall) -> Fut + Send + Sync,
95 Fut: std::future::Future<Output = Result<serde_json::Value>> + Send,
96{
97 /// Create a new async tool handler.
98 pub fn new(handler: F) -> Self {
99 Self { handler }
100 }
101}
102
103/// Event handler for processing realtime events.
104#[async_trait]
105pub trait EventHandler: Send + Sync {
106 /// Called when an audio delta is received (raw PCM bytes).
107 async fn on_audio(&self, _audio: &[u8], _item_id: &str) -> Result<()> {
108 Ok(())
109 }
110
111 /// Called when a text delta is received.
112 async fn on_text(&self, _text: &str, _item_id: &str) -> Result<()> {
113 Ok(())
114 }
115
116 /// Called when a transcript delta is received.
117 async fn on_transcript(&self, _transcript: &str, _item_id: &str) -> Result<()> {
118 Ok(())
119 }
120
121 /// Called when speech is detected.
122 async fn on_speech_started(&self, _audio_start_ms: u64) -> Result<()> {
123 Ok(())
124 }
125
126 /// Called when speech ends.
127 async fn on_speech_stopped(&self, _audio_end_ms: u64) -> Result<()> {
128 Ok(())
129 }
130
131 /// Called when a response completes.
132 async fn on_response_done(&self) -> Result<()> {
133 Ok(())
134 }
135
136 /// Called on any error.
137 async fn on_error(&self, _error: &RealtimeError) -> Result<()> {
138 Ok(())
139 }
140}
141
142/// Default no-op event handler.
143#[derive(Debug, Clone, Default)]
144pub struct NoOpEventHandler;
145
146#[async_trait]
147impl EventHandler for NoOpEventHandler {}
148
149/// Configuration for the RealtimeRunner.
150#[derive(Clone)]
151pub struct RunnerConfig {
152 /// Whether to automatically execute tool calls.
153 pub auto_execute_tools: bool,
154 /// Whether to automatically send tool responses.
155 pub auto_respond_tools: bool,
156 /// Maximum concurrent tool executions.
157 pub max_concurrent_tools: usize,
158}
159
160impl Default for RunnerConfig {
161 fn default() -> Self {
162 Self { auto_execute_tools: true, auto_respond_tools: true, max_concurrent_tools: 4 }
163 }
164}
165
166/// Builder for RealtimeRunner.
167pub struct RealtimeRunnerBuilder {
168 model: Option<BoxedModel>,
169 config: RealtimeConfig,
170 runner_config: RunnerConfig,
171 tools: HashMap<String, (ToolDefinition, Arc<dyn ToolHandler>)>,
172 event_handler: Option<Arc<dyn EventHandler>>,
173}
174
175impl Default for RealtimeRunnerBuilder {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181impl RealtimeRunnerBuilder {
182 /// Create a new builder.
183 pub fn new() -> Self {
184 Self {
185 model: None,
186 config: RealtimeConfig::default(),
187 runner_config: RunnerConfig::default(),
188 tools: HashMap::new(),
189 event_handler: None,
190 }
191 }
192
193 /// Set the realtime model.
194 pub fn model(mut self, model: BoxedModel) -> Self {
195 self.model = Some(model);
196 self
197 }
198
199 /// Set the session configuration.
200 pub fn config(mut self, config: RealtimeConfig) -> Self {
201 self.config = config;
202 self
203 }
204
205 /// Set the runner configuration.
206 pub fn runner_config(mut self, config: RunnerConfig) -> Self {
207 self.runner_config = config;
208 self
209 }
210
211 /// Set the system instruction.
212 pub fn instruction(mut self, instruction: impl Into<String>) -> Self {
213 self.config.instruction = Some(instruction.into());
214 self
215 }
216
217 /// Set the voice.
218 pub fn voice(mut self, voice: impl Into<String>) -> Self {
219 self.config.voice = Some(voice.into());
220 self
221 }
222
223 /// Register a tool with its handler.
224 pub fn tool(mut self, definition: ToolDefinition, handler: impl ToolHandler + 'static) -> Self {
225 let name = definition.name.clone();
226 self.tools.insert(name, (definition, Arc::new(handler)));
227 self
228 }
229
230 /// Register a tool with a sync function handler.
231 pub fn tool_fn<F>(self, definition: ToolDefinition, handler: F) -> Self
232 where
233 F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync + 'static,
234 {
235 self.tool(definition, FnToolHandler::new(handler))
236 }
237
238 /// Set the event handler.
239 pub fn event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
240 self.event_handler = Some(Arc::new(handler));
241 self
242 }
243
244 /// Build the runner (does not connect yet).
245 pub fn build(self) -> Result<RealtimeRunner> {
246 let model = self.model.ok_or_else(|| RealtimeError::config("Model is required"))?;
247
248 // Add tool definitions to config
249 let mut config = self.config;
250 if !self.tools.is_empty() {
251 let tool_defs: Vec<ToolDefinition> =
252 self.tools.values().map(|(def, _)| def.clone()).collect();
253 config.tools = Some(tool_defs);
254 }
255
256 Ok(RealtimeRunner {
257 model,
258 config: Arc::new(RwLock::new(config)),
259 runner_config: self.runner_config,
260 tools: self.tools,
261 event_handler: self.event_handler.unwrap_or_else(|| Arc::new(NoOpEventHandler)),
262 session: Arc::new(RwLock::new(None)),
263 state: Arc::new(RwLock::new(RunnerState::Idle)),
264 })
265 }
266}
267
268/// A runner that manages a realtime session with tool execution.
269///
270/// RealtimeRunner provides a high-level interface for:
271/// - Connecting to realtime providers
272/// - Automatically executing tool calls
273/// - Routing events to handlers
274/// - Managing the session lifecycle
275///
276/// # Example
277///
278/// ```rust,ignore
279/// use adk_realtime::{RealtimeRunner, RealtimeConfig, ToolDefinition};
280/// use adk_realtime::openai::OpenAIRealtimeModel;
281///
282/// #[tokio::main]
283/// async fn main() -> Result<()> {
284/// let model = OpenAIRealtimeModel::new(api_key, "gpt-4o-realtime-preview-2024-12-17");
285///
286/// let runner = RealtimeRunner::builder()
287/// .model(Box::new(model))
288/// .instruction("You are a helpful voice assistant.")
289/// .voice("alloy")
290/// .tool_fn(
291/// ToolDefinition::new("get_weather")
292/// .with_description("Get weather for a location"),
293/// |call| {
294/// Ok(serde_json::json!({"temperature": 72, "condition": "sunny"}))
295/// }
296/// )
297/// .build()?;
298///
299/// runner.connect().await?;
300/// runner.run().await?;
301///
302/// Ok(())
303/// }
304/// ```
305pub struct RealtimeRunner {
306 model: BoxedModel,
307 config: Arc<RwLock<RealtimeConfig>>,
308 runner_config: RunnerConfig,
309 tools: HashMap<String, (ToolDefinition, Arc<dyn ToolHandler>)>,
310 event_handler: Arc<dyn EventHandler>,
311 session: Arc<RwLock<Option<Arc<dyn crate::session::RealtimeSession>>>>,
312 state: Arc<RwLock<RunnerState>>,
313}
314
315impl RealtimeRunner {
316 /// Helper to safely acquire a cloned Arc of the current session, dropping the lock.
317 async fn session_handle(&self) -> Result<Arc<dyn crate::session::RealtimeSession>> {
318 let guard = self.session.read().await;
319 guard.as_ref().cloned().ok_or_else(|| RealtimeError::connection("Not connected"))
320 }
321
322 /// Create a new builder.
323 pub fn builder() -> RealtimeRunnerBuilder {
324 RealtimeRunnerBuilder::new()
325 }
326
327 /// Connect to the realtime provider.
328 pub async fn connect(&self) -> Result<()> {
329 let config = self.config.read().await.clone();
330 let session = self.model.connect(config).await?;
331 let mut guard = self.session.write().await;
332 *guard = Some(session.into());
333 Ok(())
334 }
335
336 /// Check if currently connected.
337 pub async fn is_connected(&self) -> bool {
338 let guard = self.session.read().await;
339 guard.as_ref().map(|s| s.is_connected()).unwrap_or(false)
340 }
341
342 /// Get the session ID if connected.
343 pub async fn session_id(&self) -> Option<String> {
344 let guard = self.session.read().await;
345 guard.as_ref().map(|s| s.session_id().to_string())
346 }
347
348 /// Send a client event directly to the session.
349 ///
350 /// This method intercepts internal control-plane events (like `UpdateSession`) to route
351 /// them through the provider-agnostic orchestration layer instead of forwarding raw JSON
352 /// to the underlying WebSocket transport. This guarantees that `adk-realtime` never leaks
353 /// invalid event payloads to providers (e.g., OpenAI or Gemini) and universally bridges
354 /// the Cognitive Handoff mechanics transparently.
355 pub async fn send_client_event(&self, event: crate::events::ClientEvent) -> Result<()> {
356 match event {
357 crate::events::ClientEvent::UpdateSession { instructions, tools } => {
358 let update_config = SessionUpdateConfig(crate::config::RealtimeConfig {
359 instruction: instructions,
360 tools,
361 ..Default::default()
362 });
363 self.update_session(update_config).await
364 }
365 other => {
366 let session = self.session_handle().await?;
367 session.send_event(other).await
368 }
369 }
370 }
371
372 /// Internal helper to merge a `SessionUpdateConfig` delta into the canonical `RealtimeConfig` state.
373 ///
374 /// **Why this exists**: The `RealtimeRunner` must maintain an absolute, single source of truth
375 /// for its configuration (`self.config`). Orchestrators fire `SessionUpdateConfig`s as sparse
376 /// partial deltas (intents to hot-swap instructions or tools mid-flight). By accumulating
377 /// these sparse updates into the single `base` config, any subsequent "Phantom Reconnect"
378 /// (e.g., due to a Gemini domain shift or an unexpected network drop) natively inherits all
379 /// prior hot-swaps alongside the immutable transport parameters (like sample rates) defined at startup.
380 ///
381 /// Note: This is intentionally narrow and specifically scoped to merge only
382 /// hot-swappable cognitive fields (instruction, tools, voice, temperature, extra).
383 /// Transport-level attributes like sample rates and audio formats are not dynamically swappable.
384 fn merge_config(base: &mut RealtimeConfig, update: &SessionUpdateConfig) {
385 if let Some(instruction) = &update.0.instruction {
386 base.instruction = Some(instruction.clone());
387 }
388 if let Some(tools) = &update.0.tools {
389 base.tools = Some(tools.clone());
390 }
391 if let Some(voice) = &update.0.voice {
392 base.voice = Some(voice.clone());
393 }
394 if let Some(temp) = update.0.temperature {
395 base.temperature = Some(temp);
396 }
397 if let Some(extra) = &update.0.extra {
398 base.extra = Some(extra.clone());
399 }
400 }
401
402 /// Update the session configuration.
403 ///
404 /// Delegates to [`Self::update_session_with_bridge`] with no bridge message.
405 ///
406 /// # Example
407 ///
408 /// ```rust,ignore
409 /// use adk_realtime::config::{SessionUpdateConfig, RealtimeConfig};
410 ///
411 /// async fn example(runner: &adk_realtime::RealtimeRunner) {
412 /// let update = SessionUpdateConfig(
413 /// RealtimeConfig::default().with_instruction("You are now a pirate.")
414 /// );
415 /// runner.update_session(update).await.unwrap();
416 /// }
417 /// ```
418 pub async fn update_session(&self, config: SessionUpdateConfig) -> Result<()> {
419 self.update_session_with_bridge(config, None).await
420 }
421
422 /// Update the session configuration, optionally injecting a bridge message if
423 /// a transport resumption (Phantom Reconnect) occurs.
424 ///
425 /// The RealtimeRunner will attempt to mutate the session natively if the underlying
426 /// API supports it (e.g., OpenAI). If it does not (e.g., Gemini), the Runner will
427 /// queue a transport resumption, executing it only when the session
428 /// is in a resumable state (Idle) to prevent data corruption.
429 ///
430 /// The runner keeps only one pending resumption. If a new session update arrives while
431 /// a resumption is already pending, the previous pending resumption is replaced. This is
432 /// intentional: pending session updates represent desired end state, not an ordered command queue.
433 /// The policy is last write wins.
434 pub async fn update_session_with_bridge(
435 &self,
436 config: SessionUpdateConfig,
437 bridge_message: Option<String>,
438 ) -> Result<()> {
439 // 1. Merge the incoming delta into the runner's canonical, persisted configuration.
440 // This ensures that any future reconnects (e.g., due to network drops) naturally
441 // inherit this latest state.
442 let mut full_config = self.config.write().await;
443 Self::merge_config(&mut full_config, &config);
444
445 let cloned_config = full_config.clone();
446 drop(full_config); // Free the write lock early to avoid deadlocks.
447
448 // 2. Safely obtain a cloned handle of the active session.
449 let session = self.session_handle().await?;
450
451 // 3. Delegate the mutation attempt to the provider-specific adapter.
452 match session.mutate_context(cloned_config).await? {
453 // PATH A: Native Mutability (e.g., OpenAI)
454 // The provider natively updated the context over the active WebSocket.
455 ContextMutationOutcome::Applied => {
456 tracing::info!("Context mutated natively mid-flight.");
457
458 // Since the transport wasn't dropped, we can inject the bridge message
459 // immediately as a standard user message to update the model's short-term memory.
460 if let Some(msg) = bridge_message {
461 let event = crate::events::ClientEvent::Message {
462 role: "user".to_string(),
463 parts: vec![adk_core::types::Part::Text { text: msg }],
464 };
465 session.send_event(event).await?;
466 }
467 Ok(())
468 }
469
470 // PATH B: Rigid Initialization (e.g., Gemini)
471 // The provider requires us to tear down the WebSocket and establish a new one (Phantom Reconnect).
472 ContextMutationOutcome::RequiresResumption(new_config) => {
473 drop(session); // CRITICAL: Drop the cloned handle before attempting state mutation.
474
475 // 4. Check the Runner's internal state machine to ensure it is safe to tear down the socket.
476 let mut state_guard = self.state.write().await;
477
478 if *state_guard == RunnerState::Idle {
479 // Safe to reconnect: The model is neither generating audio nor executing a tool.
480 drop(state_guard); // Free state lock before the heavy async network operation.
481 tracing::info!("Runner is idle. Executing resumption immediately.");
482
483 if let Err(e) =
484 self.execute_resumption((*new_config).clone(), bridge_message.clone()).await
485 {
486 tracing::error!("Immediate resumption failed: {}. Queueing for retry.", e);
487 // If the reconnect fails (e.g., transient network issue), we must not lose the mutation intent.
488 // We push it back into the queue for the background loop to retry.
489 let mut fallback_state = self.state.write().await;
490 *fallback_state = RunnerState::PendingResumption {
491 config: Box::new(*new_config),
492 bridge_message,
493 attempts: 1,
494 };
495 return Err(e);
496 }
497 } else {
498 // Unsafe to reconnect: Tearing down the socket now would corrupt the in-flight context.
499 // We must queue the mutation. The event loop will execute it once it returns to Idle.
500 if let RunnerState::PendingResumption { .. } = *state_guard {
501 tracing::warn!(
502 "Runner already had a pending resumption. Overwriting with last-write-wins policy."
503 );
504 } else {
505 tracing::info!("Runner is busy ({:?}). Queueing resumption.", *state_guard);
506 }
507
508 // Queue the intent using a last-write-wins policy.
509 *state_guard = RunnerState::PendingResumption {
510 config: new_config,
511 bridge_message,
512 attempts: 0,
513 };
514 }
515 Ok(())
516 }
517 }
518 }
519
520 /// Internal helper to execute a transport resumption (teardown and rebuild).
521 async fn execute_resumption(
522 &self,
523 new_config: crate::config::RealtimeConfig,
524 bridge_message: Option<String>,
525 ) -> Result<()> {
526 tracing::warn!("Executing transport resumption with new configuration.");
527
528 // 1. Extract the old session safely under the write lock.
529 let old_session = {
530 let mut write_guard = self.session.write().await;
531 write_guard.take()
532 };
533
534 // 2. Explicitly tear down the old WebSocket connection to release upstream resources.
535 // Do this WITHOUT holding the lock across `.await`.
536 if let Some(session) = old_session {
537 if let Err(e) = session.close().await {
538 tracing::warn!("Failed to cleanly close old session during resumption: {}", e);
539 }
540 }
541
542 // 3. Establish a brand new connection using the provider-agnostic factory interface.
543 // If the provider supports resumption natively (like Gemini), the `new_config`
544 // payload already contains the cached `resumeToken`.
545 let new_session = self.model.connect(new_config).await?;
546
547 // 4. Overwrite the active session pointer with the newly connected transport.
548 {
549 let mut write_guard = self.session.write().await;
550 *write_guard = Some(new_session.into());
551 }
552
553 // 5. If the orchestrator provided a bridge message (e.g. to explain the domain shift),
554 // safely inject it into the new connection's context window.
555 if let Some(msg) = bridge_message {
556 self.inject_bridge_message(msg).await?;
557 }
558
559 tracing::info!("Resumption complete. New transport established.");
560 Ok(())
561 }
562
563 /// Internal helper to safely inject a bridge message directly into the active session.
564 ///
565 /// This intentionally bypasses the `send_client_event` router to avoid `E0733`
566 /// (un-Boxed async recursion) where `send_client_event` -> `update_session` ->
567 /// `execute_resumption` -> `send_client_event` creates an infinite compiler loop.
568 async fn inject_bridge_message(&self, msg: String) -> Result<()> {
569 tracing::info!("Injecting bridge message post-resumption.");
570 let event = crate::events::ClientEvent::Message {
571 role: "user".to_string(),
572 parts: vec![adk_core::types::Part::Text { text: msg }],
573 };
574 let session = self.session_handle().await?;
575 session.send_event(event).await
576 }
577
578 /// Send audio to the session.
579 pub async fn send_audio(&self, audio_base64: &str) -> Result<()> {
580 let session = self.session_handle().await?;
581 session.send_audio_base64(audio_base64).await
582 }
583
584 /// Send text to the session.
585 pub async fn send_text(&self, text: &str) -> Result<()> {
586 let session = self.session_handle().await?;
587 session.send_text(text).await
588 }
589
590 /// Commit the audio buffer (for manual VAD mode).
591 pub async fn commit_audio(&self) -> Result<()> {
592 let session = self.session_handle().await?;
593 session.commit_audio().await
594 }
595
596 /// Trigger a response from the model.
597 pub async fn create_response(&self) -> Result<()> {
598 let session = self.session_handle().await?;
599 session.create_response().await
600 }
601
602 /// Interrupt the current response.
603 pub async fn interrupt(&self) -> Result<()> {
604 let session = self.session_handle().await?;
605 session.interrupt().await
606 }
607
608 /// Get the next raw event from the session.
609 ///
610 /// # Example
611 ///
612 /// ```rust,ignore
613 /// use adk_realtime::events::ServerEvent;
614 /// use tracing::{info, error};
615 ///
616 /// async fn process_events(runner: &adk_realtime::RealtimeRunner) {
617 /// while let Some(event) = runner.next_event().await {
618 /// match event {
619 /// Ok(ServerEvent::SpeechStarted { .. }) => info!("User is speaking"),
620 /// Ok(_) => info!("Received other event"),
621 /// Err(e) => error!("Error: {e}"),
622 /// }
623 /// }
624 /// }
625 /// ```
626 pub async fn next_event(&self) -> Option<Result<ServerEvent>> {
627 let session = match self.session_handle().await {
628 Ok(session) => session,
629 Err(_) => {
630 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
631 return None;
632 }
633 };
634
635 // Some sessions might yield inside next_event, but just in case, yield here too
636 tokio::task::yield_now().await;
637 session.next_event().await
638 }
639
640 /// Send a tool response to the session.
641 ///
642 /// # Example
643 ///
644 /// ```rust,ignore
645 /// use adk_realtime::events::ToolResponse;
646 /// use serde_json::json;
647 ///
648 /// async fn example(runner: &adk_realtime::RealtimeRunner) {
649 /// let response = ToolResponse {
650 /// call_id: "call_123".to_string(),
651 /// output: json!({"temperature": 72}),
652 /// };
653 /// runner.send_tool_response(response).await.unwrap();
654 /// }
655 /// ```
656 pub async fn send_tool_response(&self, response: ToolResponse) -> Result<()> {
657 let session = self.session_handle().await?;
658 session.send_tool_response(response).await
659 }
660
661 /// Run the event loop, processing events until disconnected.
662 pub async fn run(&self) -> Result<()> {
663 loop {
664 let session = self.session_handle().await?;
665 let old_session_id = session.session_id().to_string();
666 let event = session.next_event().await;
667
668 match event {
669 Some(Ok(event)) => {
670 self.handle_event(event).await?;
671 }
672 Some(Err(e)) => {
673 self.event_handler.on_error(&e).await?;
674 return Err(e);
675 }
676 None => {
677 // Session closed or swapped out. Check if a new session was installed (e.g., during reconnect).
678 let current_session_id = self.session_id().await;
679 if let Some(id) = current_session_id {
680 if id != old_session_id {
681 // A new session handle was installed concurrently. Continue polling.
682 continue;
683 }
684 }
685 // It was a real disconnect.
686 break;
687 }
688 }
689 }
690 Ok(())
691 }
692
693 /// Process a single event.
694 async fn handle_event(&self, event: ServerEvent) -> Result<()> {
695 // Track state transitions before forwarding the event
696 match &event {
697 ServerEvent::ResponseCreated { .. } => {
698 let mut state = self.state.write().await;
699 if let RunnerState::Idle = *state {
700 *state = RunnerState::Generating;
701 }
702 }
703 ServerEvent::FunctionCallDone { .. } => {
704 let mut state = self.state.write().await;
705 if let RunnerState::Generating | RunnerState::Idle = *state {
706 *state = RunnerState::ExecutingTool;
707 }
708 }
709 _ => {}
710 }
711
712 match event {
713 ServerEvent::AudioDelta { delta, item_id, .. } => {
714 self.event_handler.on_audio(&delta, &item_id).await?;
715 }
716 ServerEvent::TextDelta { delta, item_id, .. } => {
717 self.event_handler.on_text(&delta, &item_id).await?;
718 }
719 ServerEvent::TranscriptDelta { delta, item_id, .. } => {
720 self.event_handler.on_transcript(&delta, &item_id).await?;
721 }
722 ServerEvent::SpeechStarted { audio_start_ms, .. } => {
723 self.event_handler.on_speech_started(audio_start_ms).await?;
724 }
725 ServerEvent::SpeechStopped { audio_end_ms, .. } => {
726 self.event_handler.on_speech_stopped(audio_end_ms).await?;
727 }
728 ServerEvent::ResponseDone { .. } => {
729 self.event_handler.on_response_done().await?;
730 self.check_resumption_queue().await?;
731 }
732 ServerEvent::FunctionCallDone { call_id, name, arguments, .. } => {
733 if self.runner_config.auto_execute_tools {
734 self.execute_tool_call(&call_id, &name, &arguments).await?;
735 }
736 }
737 ServerEvent::SessionUpdated { session, .. } => {
738 // Check if the generic session update contains a resumption token
739 if let Some(token) = session.get("resumeToken").and_then(|t| t.as_str()) {
740 tracing::info!(
741 "Received Gemini sessionResumption token, saving for future reconnects."
742 );
743 let mut config = self.config.write().await;
744 let mut extra = config.extra.clone().unwrap_or_else(|| serde_json::json!({}));
745 extra["resumeToken"] = serde_json::Value::String(token.to_string());
746 config.extra = Some(extra);
747 }
748 }
749 ServerEvent::Error { error, .. } => {
750 let err = RealtimeError::server(error.code.unwrap_or_default(), error.message);
751 self.event_handler.on_error(&err).await?;
752 }
753 _ => {
754 // Ignore other events
755 }
756 }
757 Ok(())
758 }
759
760 /// Safely transitions the runner back to Idle and executes any queued resumptions.
761 async fn check_resumption_queue(&self) -> Result<()> {
762 // 1. Acquire the state lock to inspect the queue.
763 let mut state = self.state.write().await;
764
765 // 2. Extract the pending configuration and attempt count if one exists.
766 let pending =
767 if let RunnerState::PendingResumption { config, bridge_message, attempts } = &*state {
768 Some((config.clone(), bridge_message.clone(), *attempts))
769 } else {
770 None
771 };
772
773 if let Some((config, bridge_message, attempts)) = pending {
774 tracing::info!(
775 "Executing queued resumption after turn completion. (Attempt {})",
776 attempts + 1
777 );
778
779 // 3. Mark the state as Idle so the background loop is unblocked.
780 *state = RunnerState::Idle;
781
782 // 4. Release the state lock *before* performing the heavy async socket connection.
783 drop(state);
784
785 // 5. Attempt the actual transport teardown/rebuild.
786 if let Err(e) = self.execute_resumption((*config).clone(), bridge_message.clone()).await
787 {
788 tracing::error!("Resumption failed: {}.", e);
789
790 // 6. If the reconnect fails (e.g., transient network error), re-acquire the lock
791 // to safely handle the retry logic without crashing the event loop.
792 let mut fallback_state = self.state.write().await;
793
794 // 7. Enforce a maximum retry budget to prevent infinite "hot-looping"
795 if attempts + 1 >= 3 {
796 tracing::error!(
797 "Maximum resumption attempts reached (3). Dropping queued mutation to prevent infinite loop."
798 );
799 *fallback_state = RunnerState::Idle;
800 } else {
801 tracing::info!("Restoring pending queue state for retry.");
802 *fallback_state = RunnerState::PendingResumption {
803 config,
804 bridge_message,
805 attempts: attempts + 1,
806 };
807 }
808
809 // 8. Do not return Err(e) here, as that would permanently kill the `run()` loop.
810 // Instead, report the error to the downstream handler and allow the event loop to continue spinning.
811 let _ = self.event_handler.on_error(&e).await;
812 }
813 } else {
814 // No resumptions were queued; simply mark as Idle.
815 *state = RunnerState::Idle;
816 }
817 Ok(())
818 }
819
820 /// Execute a tool call and optionally send the response.
821 async fn execute_tool_call(&self, call_id: &str, name: &str, arguments: &str) -> Result<()> {
822 let handler = self.tools.get(name).map(|(_, h)| h.clone());
823
824 let result = if let Some(handler) = handler {
825 let args: serde_json::Value = serde_json::from_str(arguments)
826 .unwrap_or(serde_json::Value::Object(Default::default()));
827
828 let call =
829 ToolCall { call_id: call_id.to_string(), name: name.to_string(), arguments: args };
830
831 match handler.execute(&call).await {
832 Ok(value) => value,
833 Err(e) => serde_json::json!({
834 "error": e.to_string()
835 }),
836 }
837 } else {
838 serde_json::json!({
839 "error": format!("Unknown tool: {}", name)
840 })
841 };
842
843 if self.runner_config.auto_respond_tools {
844 let response = ToolResponse { call_id: call_id.to_string(), output: result };
845
846 if let Ok(session) = self.session_handle().await {
847 session.send_tool_response(response).await?;
848 }
849 }
850
851 Ok(())
852 }
853
854 /// Close the session.
855 pub async fn close(&self) -> Result<()> {
856 if let Ok(session) = self.session_handle().await {
857 session.close().await?;
858 }
859 Ok(())
860 }
861}