Skip to main content

adk_realtime/
runner.rs

1//! RealtimeRunner for integrating realtime sessions with agents.
2//!
3//! This module provides the bridge between realtime audio sessions and
4//! the ADK agent framework, handling tool execution and event routing.
5
6use crate::config::{RealtimeConfig, SessionUpdateConfig, ToolDefinition};
7use crate::error::{RealtimeError, Result};
8use crate::events::{ServerEvent, ToolCall, ToolResponse};
9use crate::model::BoxedModel;
10use crate::session::ContextMutationOutcome;
11use async_trait::async_trait;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16/// Internal state machine tracking the resumability status of the RealtimeRunner.
17#[derive(Debug, Clone, Default, PartialEq)]
18pub enum RunnerState {
19    /// Runner is ready to accept transport resumption immediately.
20    #[default]
21    Idle,
22    /// Model is currently generating a response; tearing down the connection would corrupt context.
23    Generating,
24    /// A tool is currently executing; teardown would cause tool loss.
25    ExecutingTool,
26    /// A context mutation was queued while the runner was busy, and must be executed once Idle.
27    ///
28    /// **Provider Context:** This state is only utilized by providers that do *not* support
29    /// native mid-flight mutability (e.g., Gemini Live), requiring a physical transport teardown
30    /// and rebuild (Phantom Reconnect). Providers like OpenAI natively apply `session.update`
31    /// frames instantly and will never enter this queued state.
32    ///
33    /// **Queue Policy:** The runner keeps only one pending resumption. If a new session update
34    /// arrives while a resumption is already pending, the previous pending resumption is replaced.
35    /// This is intentional: pending session updates represent desired end state, not an ordered
36    /// command queue. The policy is last write wins.
37    PendingResumption {
38        /// The new configuration to apply on reconnection.
39        config: Box<crate::config::RealtimeConfig>,
40        /// An optional message to inject immediately after resumption.
41        bridge_message: Option<String>,
42        /// Number of failed reconnection attempts for this mutation.
43        attempts: u8,
44    },
45}
46
47/// Handler for tool/function calls from the realtime model.
48#[async_trait]
49pub trait ToolHandler: Send + Sync {
50    /// Execute a tool call and return the result.
51    async fn execute(&self, call: &ToolCall) -> Result<serde_json::Value>;
52}
53
54/// A simple function-based tool handler.
55pub struct FnToolHandler<F>
56where
57    F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
58{
59    handler: F,
60}
61
62impl<F> FnToolHandler<F>
63where
64    F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
65{
66    /// Create a new function-based tool handler.
67    pub fn new(handler: F) -> Self {
68        Self { handler }
69    }
70}
71
72#[async_trait]
73impl<F> ToolHandler for FnToolHandler<F>
74where
75    F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
76{
77    async fn execute(&self, call: &ToolCall) -> Result<serde_json::Value> {
78        (self.handler)(call)
79    }
80}
81
82/// Async function-based tool handler.
83#[allow(dead_code)]
84pub struct AsyncToolHandler<F, Fut>
85where
86    F: Fn(ToolCall) -> Fut + Send + Sync,
87    Fut: std::future::Future<Output = Result<serde_json::Value>> + Send,
88{
89    handler: F,
90}
91
92impl<F, Fut> AsyncToolHandler<F, Fut>
93where
94    F: Fn(ToolCall) -> Fut + Send + Sync,
95    Fut: std::future::Future<Output = Result<serde_json::Value>> + Send,
96{
97    /// Create a new async tool handler.
98    pub fn new(handler: F) -> Self {
99        Self { handler }
100    }
101}
102
103/// Event handler for processing realtime events.
104#[async_trait]
105pub trait EventHandler: Send + Sync {
106    /// Called when an audio delta is received (raw PCM bytes).
107    async fn on_audio(&self, _audio: &[u8], _item_id: &str) -> Result<()> {
108        Ok(())
109    }
110
111    /// Called when a text delta is received.
112    async fn on_text(&self, _text: &str, _item_id: &str) -> Result<()> {
113        Ok(())
114    }
115
116    /// Called when a transcript delta is received.
117    async fn on_transcript(&self, _transcript: &str, _item_id: &str) -> Result<()> {
118        Ok(())
119    }
120
121    /// Called when speech is detected.
122    async fn on_speech_started(&self, _audio_start_ms: u64) -> Result<()> {
123        Ok(())
124    }
125
126    /// Called when speech ends.
127    async fn on_speech_stopped(&self, _audio_end_ms: u64) -> Result<()> {
128        Ok(())
129    }
130
131    /// Called when a response completes.
132    async fn on_response_done(&self) -> Result<()> {
133        Ok(())
134    }
135
136    /// Called on any error.
137    async fn on_error(&self, _error: &RealtimeError) -> Result<()> {
138        Ok(())
139    }
140}
141
142/// Default no-op event handler.
143#[derive(Debug, Clone, Default)]
144pub struct NoOpEventHandler;
145
146#[async_trait]
147impl EventHandler for NoOpEventHandler {}
148
149/// Configuration for the RealtimeRunner.
150#[derive(Clone)]
151pub struct RunnerConfig {
152    /// Whether to automatically execute tool calls.
153    pub auto_execute_tools: bool,
154    /// Whether to automatically send tool responses.
155    pub auto_respond_tools: bool,
156    /// Maximum concurrent tool executions.
157    pub max_concurrent_tools: usize,
158}
159
160impl Default for RunnerConfig {
161    fn default() -> Self {
162        Self { auto_execute_tools: true, auto_respond_tools: true, max_concurrent_tools: 4 }
163    }
164}
165
166/// Builder for RealtimeRunner.
167pub struct RealtimeRunnerBuilder {
168    model: Option<BoxedModel>,
169    config: RealtimeConfig,
170    runner_config: RunnerConfig,
171    tools: HashMap<String, (ToolDefinition, Arc<dyn ToolHandler>)>,
172    event_handler: Option<Arc<dyn EventHandler>>,
173}
174
175impl Default for RealtimeRunnerBuilder {
176    fn default() -> Self {
177        Self::new()
178    }
179}
180
181impl RealtimeRunnerBuilder {
182    /// Create a new builder.
183    pub fn new() -> Self {
184        Self {
185            model: None,
186            config: RealtimeConfig::default(),
187            runner_config: RunnerConfig::default(),
188            tools: HashMap::new(),
189            event_handler: None,
190        }
191    }
192
193    /// Set the realtime model.
194    pub fn model(mut self, model: BoxedModel) -> Self {
195        self.model = Some(model);
196        self
197    }
198
199    /// Set the session configuration.
200    pub fn config(mut self, config: RealtimeConfig) -> Self {
201        self.config = config;
202        self
203    }
204
205    /// Set the runner configuration.
206    pub fn runner_config(mut self, config: RunnerConfig) -> Self {
207        self.runner_config = config;
208        self
209    }
210
211    /// Set the system instruction.
212    pub fn instruction(mut self, instruction: impl Into<String>) -> Self {
213        self.config.instruction = Some(instruction.into());
214        self
215    }
216
217    /// Set the voice.
218    pub fn voice(mut self, voice: impl Into<String>) -> Self {
219        self.config.voice = Some(voice.into());
220        self
221    }
222
223    /// Register a tool with its handler.
224    pub fn tool(mut self, definition: ToolDefinition, handler: impl ToolHandler + 'static) -> Self {
225        let name = definition.name.clone();
226        self.tools.insert(name, (definition, Arc::new(handler)));
227        self
228    }
229
230    /// Register a tool with a sync function handler.
231    pub fn tool_fn<F>(self, definition: ToolDefinition, handler: F) -> Self
232    where
233        F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync + 'static,
234    {
235        self.tool(definition, FnToolHandler::new(handler))
236    }
237
238    /// Set the event handler.
239    pub fn event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
240        self.event_handler = Some(Arc::new(handler));
241        self
242    }
243
244    /// Build the runner (does not connect yet).
245    pub fn build(self) -> Result<RealtimeRunner> {
246        let model = self.model.ok_or_else(|| RealtimeError::config("Model is required"))?;
247
248        // Add tool definitions to config
249        let mut config = self.config;
250        if !self.tools.is_empty() {
251            let tool_defs: Vec<ToolDefinition> =
252                self.tools.values().map(|(def, _)| def.clone()).collect();
253            config.tools = Some(tool_defs);
254        }
255
256        Ok(RealtimeRunner {
257            model,
258            config: Arc::new(RwLock::new(config)),
259            runner_config: self.runner_config,
260            tools: self.tools,
261            event_handler: self.event_handler.unwrap_or_else(|| Arc::new(NoOpEventHandler)),
262            session: Arc::new(RwLock::new(None)),
263            state: Arc::new(RwLock::new(RunnerState::Idle)),
264        })
265    }
266}
267
268/// A runner that manages a realtime session with tool execution.
269///
270/// RealtimeRunner provides a high-level interface for:
271/// - Connecting to realtime providers
272/// - Automatically executing tool calls
273/// - Routing events to handlers
274/// - Managing the session lifecycle
275///
276/// # Example
277///
278/// ```rust,ignore
279/// use adk_realtime::{RealtimeRunner, RealtimeConfig, ToolDefinition};
280/// use adk_realtime::openai::OpenAIRealtimeModel;
281///
282/// #[tokio::main]
283/// async fn main() -> Result<()> {
284///     let model = OpenAIRealtimeModel::new(api_key, "gpt-4o-realtime-preview-2024-12-17");
285///
286///     let runner = RealtimeRunner::builder()
287///         .model(Box::new(model))
288///         .instruction("You are a helpful voice assistant.")
289///         .voice("alloy")
290///         .tool_fn(
291///             ToolDefinition::new("get_weather")
292///                 .with_description("Get weather for a location"),
293///             |call| {
294///                 Ok(serde_json::json!({"temperature": 72, "condition": "sunny"}))
295///             }
296///         )
297///         .build()?;
298///
299///     runner.connect().await?;
300///     runner.run().await?;
301///
302///     Ok(())
303/// }
304/// ```
305pub struct RealtimeRunner {
306    model: BoxedModel,
307    config: Arc<RwLock<RealtimeConfig>>,
308    runner_config: RunnerConfig,
309    tools: HashMap<String, (ToolDefinition, Arc<dyn ToolHandler>)>,
310    event_handler: Arc<dyn EventHandler>,
311    session: Arc<RwLock<Option<Arc<dyn crate::session::RealtimeSession>>>>,
312    state: Arc<RwLock<RunnerState>>,
313}
314
315impl RealtimeRunner {
316    /// Helper to safely acquire a cloned Arc of the current session, dropping the lock.
317    async fn session_handle(&self) -> Result<Arc<dyn crate::session::RealtimeSession>> {
318        let guard = self.session.read().await;
319        guard.as_ref().cloned().ok_or_else(|| RealtimeError::connection("Not connected"))
320    }
321
322    /// Create a new builder.
323    pub fn builder() -> RealtimeRunnerBuilder {
324        RealtimeRunnerBuilder::new()
325    }
326
327    /// Connect to the realtime provider.
328    pub async fn connect(&self) -> Result<()> {
329        let config = self.config.read().await.clone();
330        let session = self.model.connect(config).await?;
331        let mut guard = self.session.write().await;
332        *guard = Some(session.into());
333        Ok(())
334    }
335
336    /// Check if currently connected.
337    pub async fn is_connected(&self) -> bool {
338        let guard = self.session.read().await;
339        guard.as_ref().map(|s| s.is_connected()).unwrap_or(false)
340    }
341
342    /// Get the session ID if connected.
343    pub async fn session_id(&self) -> Option<String> {
344        let guard = self.session.read().await;
345        guard.as_ref().map(|s| s.session_id().to_string())
346    }
347
348    /// Send a client event directly to the session.
349    ///
350    /// This method intercepts internal control-plane events (like `UpdateSession`) to route
351    /// them through the provider-agnostic orchestration layer instead of forwarding raw JSON
352    /// to the underlying WebSocket transport. This guarantees that `adk-realtime` never leaks
353    /// invalid event payloads to providers (e.g., OpenAI or Gemini) and universally bridges
354    /// the Cognitive Handoff mechanics transparently.
355    pub async fn send_client_event(&self, event: crate::events::ClientEvent) -> Result<()> {
356        match event {
357            crate::events::ClientEvent::UpdateSession { instructions, tools } => {
358                let update_config = SessionUpdateConfig(crate::config::RealtimeConfig {
359                    instruction: instructions,
360                    tools,
361                    ..Default::default()
362                });
363                self.update_session(update_config).await
364            }
365            other => {
366                let session = self.session_handle().await?;
367                session.send_event(other).await
368            }
369        }
370    }
371
372    /// Internal helper to merge a `SessionUpdateConfig` delta into the canonical `RealtimeConfig` state.
373    ///
374    /// **Why this exists**: The `RealtimeRunner` must maintain an absolute, single source of truth
375    /// for its configuration (`self.config`). Orchestrators fire `SessionUpdateConfig`s as sparse
376    /// partial deltas (intents to hot-swap instructions or tools mid-flight). By accumulating
377    /// these sparse updates into the single `base` config, any subsequent "Phantom Reconnect"
378    /// (e.g., due to a Gemini domain shift or an unexpected network drop) natively inherits all
379    /// prior hot-swaps alongside the immutable transport parameters (like sample rates) defined at startup.
380    ///
381    /// Note: This is intentionally narrow and specifically scoped to merge only
382    /// hot-swappable cognitive fields (instruction, tools, voice, temperature, extra).
383    /// Transport-level attributes like sample rates and audio formats are not dynamically swappable.
384    fn merge_config(base: &mut RealtimeConfig, update: &SessionUpdateConfig) {
385        if let Some(instruction) = &update.0.instruction {
386            base.instruction = Some(instruction.clone());
387        }
388        if let Some(tools) = &update.0.tools {
389            base.tools = Some(tools.clone());
390        }
391        if let Some(voice) = &update.0.voice {
392            base.voice = Some(voice.clone());
393        }
394        if let Some(temp) = update.0.temperature {
395            base.temperature = Some(temp);
396        }
397        if let Some(extra) = &update.0.extra {
398            base.extra = Some(extra.clone());
399        }
400    }
401
402    /// Update the session configuration.
403    ///
404    /// Delegates to [`Self::update_session_with_bridge`] with no bridge message.
405    ///
406    /// # Example
407    ///
408    /// ```rust,ignore
409    /// use adk_realtime::config::{SessionUpdateConfig, RealtimeConfig};
410    ///
411    /// async fn example(runner: &adk_realtime::RealtimeRunner) {
412    ///     let update = SessionUpdateConfig(
413    ///         RealtimeConfig::default().with_instruction("You are now a pirate.")
414    ///     );
415    ///     runner.update_session(update).await.unwrap();
416    /// }
417    /// ```
418    pub async fn update_session(&self, config: SessionUpdateConfig) -> Result<()> {
419        self.update_session_with_bridge(config, None).await
420    }
421
422    /// Update the session configuration, optionally injecting a bridge message if
423    /// a transport resumption (Phantom Reconnect) occurs.
424    ///
425    /// The RealtimeRunner will attempt to mutate the session natively if the underlying
426    /// API supports it (e.g., OpenAI). If it does not (e.g., Gemini), the Runner will
427    /// queue a transport resumption, executing it only when the session
428    /// is in a resumable state (Idle) to prevent data corruption.
429    ///
430    /// The runner keeps only one pending resumption. If a new session update arrives while
431    /// a resumption is already pending, the previous pending resumption is replaced. This is
432    /// intentional: pending session updates represent desired end state, not an ordered command queue.
433    /// The policy is last write wins.
434    pub async fn update_session_with_bridge(
435        &self,
436        config: SessionUpdateConfig,
437        bridge_message: Option<String>,
438    ) -> Result<()> {
439        // 1. Merge the incoming delta into the runner's canonical, persisted configuration.
440        // This ensures that any future reconnects (e.g., due to network drops) naturally
441        // inherit this latest state.
442        let mut full_config = self.config.write().await;
443        Self::merge_config(&mut full_config, &config);
444
445        let cloned_config = full_config.clone();
446        drop(full_config); // Free the write lock early to avoid deadlocks.
447
448        // 2. Safely obtain a cloned handle of the active session.
449        let session = self.session_handle().await?;
450
451        // 3. Delegate the mutation attempt to the provider-specific adapter.
452        match session.mutate_context(cloned_config).await? {
453            // PATH A: Native Mutability (e.g., OpenAI)
454            // The provider natively updated the context over the active WebSocket.
455            ContextMutationOutcome::Applied => {
456                tracing::info!("Context mutated natively mid-flight.");
457
458                // Since the transport wasn't dropped, we can inject the bridge message
459                // immediately as a standard user message to update the model's short-term memory.
460                if let Some(msg) = bridge_message {
461                    let event = crate::events::ClientEvent::Message {
462                        role: "user".to_string(),
463                        parts: vec![adk_core::types::Part::Text { text: msg }],
464                    };
465                    session.send_event(event).await?;
466                }
467                Ok(())
468            }
469
470            // PATH B: Rigid Initialization (e.g., Gemini)
471            // The provider requires us to tear down the WebSocket and establish a new one (Phantom Reconnect).
472            ContextMutationOutcome::RequiresResumption(new_config) => {
473                drop(session); // CRITICAL: Drop the cloned handle before attempting state mutation.
474
475                // 4. Check the Runner's internal state machine to ensure it is safe to tear down the socket.
476                let mut state_guard = self.state.write().await;
477
478                if *state_guard == RunnerState::Idle {
479                    // Safe to reconnect: The model is neither generating audio nor executing a tool.
480                    drop(state_guard); // Free state lock before the heavy async network operation.
481                    tracing::info!("Runner is idle. Executing resumption immediately.");
482
483                    if let Err(e) =
484                        self.execute_resumption((*new_config).clone(), bridge_message.clone()).await
485                    {
486                        tracing::error!("Immediate resumption failed: {}. Queueing for retry.", e);
487                        // If the reconnect fails (e.g., transient network issue), we must not lose the mutation intent.
488                        // We push it back into the queue for the background loop to retry.
489                        let mut fallback_state = self.state.write().await;
490                        *fallback_state = RunnerState::PendingResumption {
491                            config: Box::new(*new_config),
492                            bridge_message,
493                            attempts: 1,
494                        };
495                        return Err(e);
496                    }
497                } else {
498                    // Unsafe to reconnect: Tearing down the socket now would corrupt the in-flight context.
499                    // We must queue the mutation. The event loop will execute it once it returns to Idle.
500                    if let RunnerState::PendingResumption { .. } = *state_guard {
501                        tracing::warn!(
502                            "Runner already had a pending resumption. Overwriting with last-write-wins policy."
503                        );
504                    } else {
505                        tracing::info!("Runner is busy ({:?}). Queueing resumption.", *state_guard);
506                    }
507
508                    // Queue the intent using a last-write-wins policy.
509                    *state_guard = RunnerState::PendingResumption {
510                        config: new_config,
511                        bridge_message,
512                        attempts: 0,
513                    };
514                }
515                Ok(())
516            }
517        }
518    }
519
520    /// Internal helper to execute a transport resumption (teardown and rebuild).
521    async fn execute_resumption(
522        &self,
523        new_config: crate::config::RealtimeConfig,
524        bridge_message: Option<String>,
525    ) -> Result<()> {
526        tracing::warn!("Executing transport resumption with new configuration.");
527
528        // 1. Extract the old session safely under the write lock.
529        let old_session = {
530            let mut write_guard = self.session.write().await;
531            write_guard.take()
532        };
533
534        // 2. Explicitly tear down the old WebSocket connection to release upstream resources.
535        // Do this WITHOUT holding the lock across `.await`.
536        if let Some(session) = old_session {
537            if let Err(e) = session.close().await {
538                tracing::warn!("Failed to cleanly close old session during resumption: {}", e);
539            }
540        }
541
542        // 3. Establish a brand new connection using the provider-agnostic factory interface.
543        // If the provider supports resumption natively (like Gemini), the `new_config`
544        // payload already contains the cached `resumeToken`.
545        let new_session = self.model.connect(new_config).await?;
546
547        // 4. Overwrite the active session pointer with the newly connected transport.
548        {
549            let mut write_guard = self.session.write().await;
550            *write_guard = Some(new_session.into());
551        }
552
553        // 5. If the orchestrator provided a bridge message (e.g. to explain the domain shift),
554        // safely inject it into the new connection's context window.
555        if let Some(msg) = bridge_message {
556            self.inject_bridge_message(msg).await?;
557        }
558
559        tracing::info!("Resumption complete. New transport established.");
560        Ok(())
561    }
562
563    /// Internal helper to safely inject a bridge message directly into the active session.
564    ///
565    /// This intentionally bypasses the `send_client_event` router to avoid `E0733`
566    /// (un-Boxed async recursion) where `send_client_event` -> `update_session` ->
567    /// `execute_resumption` -> `send_client_event` creates an infinite compiler loop.
568    async fn inject_bridge_message(&self, msg: String) -> Result<()> {
569        tracing::info!("Injecting bridge message post-resumption.");
570        let event = crate::events::ClientEvent::Message {
571            role: "user".to_string(),
572            parts: vec![adk_core::types::Part::Text { text: msg }],
573        };
574        let session = self.session_handle().await?;
575        session.send_event(event).await
576    }
577
578    /// Send audio to the session.
579    pub async fn send_audio(&self, audio_base64: &str) -> Result<()> {
580        let session = self.session_handle().await?;
581        session.send_audio_base64(audio_base64).await
582    }
583
584    /// Send text to the session.
585    pub async fn send_text(&self, text: &str) -> Result<()> {
586        let session = self.session_handle().await?;
587        session.send_text(text).await
588    }
589
590    /// Commit the audio buffer (for manual VAD mode).
591    pub async fn commit_audio(&self) -> Result<()> {
592        let session = self.session_handle().await?;
593        session.commit_audio().await
594    }
595
596    /// Trigger a response from the model.
597    pub async fn create_response(&self) -> Result<()> {
598        let session = self.session_handle().await?;
599        session.create_response().await
600    }
601
602    /// Interrupt the current response.
603    pub async fn interrupt(&self) -> Result<()> {
604        let session = self.session_handle().await?;
605        session.interrupt().await
606    }
607
608    /// Get the next raw event from the session.
609    ///
610    /// # Example
611    ///
612    /// ```rust,ignore
613    /// use adk_realtime::events::ServerEvent;
614    /// use tracing::{info, error};
615    ///
616    /// async fn process_events(runner: &adk_realtime::RealtimeRunner) {
617    ///     while let Some(event) = runner.next_event().await {
618    ///         match event {
619    ///             Ok(ServerEvent::SpeechStarted { .. }) => info!("User is speaking"),
620    ///             Ok(_) => info!("Received other event"),
621    ///             Err(e) => error!("Error: {e}"),
622    ///         }
623    ///     }
624    /// }
625    /// ```
626    pub async fn next_event(&self) -> Option<Result<ServerEvent>> {
627        let session = match self.session_handle().await {
628            Ok(session) => session,
629            Err(_) => {
630                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
631                return None;
632            }
633        };
634
635        // Some sessions might yield inside next_event, but just in case, yield here too
636        tokio::task::yield_now().await;
637        session.next_event().await
638    }
639
640    /// Send a tool response to the session.
641    ///
642    /// # Example
643    ///
644    /// ```rust,ignore
645    /// use adk_realtime::events::ToolResponse;
646    /// use serde_json::json;
647    ///
648    /// async fn example(runner: &adk_realtime::RealtimeRunner) {
649    ///     let response = ToolResponse {
650    ///         call_id: "call_123".to_string(),
651    ///         output: json!({"temperature": 72}),
652    ///     };
653    ///     runner.send_tool_response(response).await.unwrap();
654    /// }
655    /// ```
656    pub async fn send_tool_response(&self, response: ToolResponse) -> Result<()> {
657        let session = self.session_handle().await?;
658        session.send_tool_response(response).await
659    }
660
661    /// Run the event loop, processing events until disconnected.
662    pub async fn run(&self) -> Result<()> {
663        loop {
664            let session = self.session_handle().await?;
665            let old_session_id = session.session_id().to_string();
666            let event = session.next_event().await;
667
668            match event {
669                Some(Ok(event)) => {
670                    self.handle_event(event).await?;
671                }
672                Some(Err(e)) => {
673                    self.event_handler.on_error(&e).await?;
674                    return Err(e);
675                }
676                None => {
677                    // Session closed or swapped out. Check if a new session was installed (e.g., during reconnect).
678                    let current_session_id = self.session_id().await;
679                    if let Some(id) = current_session_id {
680                        if id != old_session_id {
681                            // A new session handle was installed concurrently. Continue polling.
682                            continue;
683                        }
684                    }
685                    // It was a real disconnect.
686                    break;
687                }
688            }
689        }
690        Ok(())
691    }
692
693    /// Process a single event.
694    async fn handle_event(&self, event: ServerEvent) -> Result<()> {
695        // Track state transitions before forwarding the event
696        match &event {
697            ServerEvent::ResponseCreated { .. } => {
698                let mut state = self.state.write().await;
699                if let RunnerState::Idle = *state {
700                    *state = RunnerState::Generating;
701                }
702            }
703            ServerEvent::FunctionCallDone { .. } => {
704                let mut state = self.state.write().await;
705                if let RunnerState::Generating | RunnerState::Idle = *state {
706                    *state = RunnerState::ExecutingTool;
707                }
708            }
709            _ => {}
710        }
711
712        match event {
713            ServerEvent::AudioDelta { delta, item_id, .. } => {
714                self.event_handler.on_audio(&delta, &item_id).await?;
715            }
716            ServerEvent::TextDelta { delta, item_id, .. } => {
717                self.event_handler.on_text(&delta, &item_id).await?;
718            }
719            ServerEvent::TranscriptDelta { delta, item_id, .. } => {
720                self.event_handler.on_transcript(&delta, &item_id).await?;
721            }
722            ServerEvent::SpeechStarted { audio_start_ms, .. } => {
723                self.event_handler.on_speech_started(audio_start_ms).await?;
724            }
725            ServerEvent::SpeechStopped { audio_end_ms, .. } => {
726                self.event_handler.on_speech_stopped(audio_end_ms).await?;
727            }
728            ServerEvent::ResponseDone { .. } => {
729                self.event_handler.on_response_done().await?;
730                self.check_resumption_queue().await?;
731            }
732            ServerEvent::FunctionCallDone { call_id, name, arguments, .. } => {
733                if self.runner_config.auto_execute_tools {
734                    self.execute_tool_call(&call_id, &name, &arguments).await?;
735                }
736            }
737            ServerEvent::SessionUpdated { session, .. } => {
738                // Check if the generic session update contains a resumption token
739                if let Some(token) = session.get("resumeToken").and_then(|t| t.as_str()) {
740                    tracing::info!(
741                        "Received Gemini sessionResumption token, saving for future reconnects."
742                    );
743                    let mut config = self.config.write().await;
744                    let mut extra = config.extra.clone().unwrap_or_else(|| serde_json::json!({}));
745                    extra["resumeToken"] = serde_json::Value::String(token.to_string());
746                    config.extra = Some(extra);
747                }
748            }
749            ServerEvent::Error { error, .. } => {
750                let err = RealtimeError::server(error.code.unwrap_or_default(), error.message);
751                self.event_handler.on_error(&err).await?;
752            }
753            _ => {
754                // Ignore other events
755            }
756        }
757        Ok(())
758    }
759
760    /// Safely transitions the runner back to Idle and executes any queued resumptions.
761    async fn check_resumption_queue(&self) -> Result<()> {
762        // 1. Acquire the state lock to inspect the queue.
763        let mut state = self.state.write().await;
764
765        // 2. Extract the pending configuration and attempt count if one exists.
766        let pending =
767            if let RunnerState::PendingResumption { config, bridge_message, attempts } = &*state {
768                Some((config.clone(), bridge_message.clone(), *attempts))
769            } else {
770                None
771            };
772
773        if let Some((config, bridge_message, attempts)) = pending {
774            tracing::info!(
775                "Executing queued resumption after turn completion. (Attempt {})",
776                attempts + 1
777            );
778
779            // 3. Mark the state as Idle so the background loop is unblocked.
780            *state = RunnerState::Idle;
781
782            // 4. Release the state lock *before* performing the heavy async socket connection.
783            drop(state);
784
785            // 5. Attempt the actual transport teardown/rebuild.
786            if let Err(e) = self.execute_resumption((*config).clone(), bridge_message.clone()).await
787            {
788                tracing::error!("Resumption failed: {}.", e);
789
790                // 6. If the reconnect fails (e.g., transient network error), re-acquire the lock
791                // to safely handle the retry logic without crashing the event loop.
792                let mut fallback_state = self.state.write().await;
793
794                // 7. Enforce a maximum retry budget to prevent infinite "hot-looping"
795                if attempts + 1 >= 3 {
796                    tracing::error!(
797                        "Maximum resumption attempts reached (3). Dropping queued mutation to prevent infinite loop."
798                    );
799                    *fallback_state = RunnerState::Idle;
800                } else {
801                    tracing::info!("Restoring pending queue state for retry.");
802                    *fallback_state = RunnerState::PendingResumption {
803                        config,
804                        bridge_message,
805                        attempts: attempts + 1,
806                    };
807                }
808
809                // 8. Do not return Err(e) here, as that would permanently kill the `run()` loop.
810                // Instead, report the error to the downstream handler and allow the event loop to continue spinning.
811                let _ = self.event_handler.on_error(&e).await;
812            }
813        } else {
814            // No resumptions were queued; simply mark as Idle.
815            *state = RunnerState::Idle;
816        }
817        Ok(())
818    }
819
820    /// Execute a tool call and optionally send the response.
821    async fn execute_tool_call(&self, call_id: &str, name: &str, arguments: &str) -> Result<()> {
822        let handler = self.tools.get(name).map(|(_, h)| h.clone());
823
824        let result = if let Some(handler) = handler {
825            let args: serde_json::Value = serde_json::from_str(arguments)
826                .unwrap_or(serde_json::Value::Object(Default::default()));
827
828            let call =
829                ToolCall { call_id: call_id.to_string(), name: name.to_string(), arguments: args };
830
831            match handler.execute(&call).await {
832                Ok(value) => value,
833                Err(e) => serde_json::json!({
834                    "error": e.to_string()
835                }),
836            }
837        } else {
838            serde_json::json!({
839                "error": format!("Unknown tool: {}", name)
840            })
841        };
842
843        if self.runner_config.auto_respond_tools {
844            let response = ToolResponse { call_id: call_id.to_string(), output: result };
845
846            if let Ok(session) = self.session_handle().await {
847                session.send_tool_response(response).await?;
848            }
849        }
850
851        Ok(())
852    }
853
854    /// Close the session.
855    pub async fn close(&self) -> Result<()> {
856        if let Ok(session) = self.session_handle().await {
857            session.close().await?;
858        }
859        Ok(())
860    }
861}