Skip to main content

adk_realtime/
runner.rs

1//! RealtimeRunner for integrating realtime sessions with agents.
2//!
3//! This module provides the bridge between realtime audio sessions and
4//! the ADK agent framework, handling tool execution and event routing.
5
6use crate::config::{RealtimeConfig, SessionUpdateConfig, ToolDefinition};
7use crate::error::{RealtimeError, Result};
8use crate::events::{ServerEvent, ToolCall, ToolResponse};
9use crate::model::BoxedModel;
10use crate::session::{BoxedSession, ContextMutationOutcome};
11use async_trait::async_trait;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16/// Internal state machine tracking the resumability status of the RealtimeRunner.
17#[derive(Debug, Clone, Default, PartialEq)]
18pub enum RunnerState {
19    /// Runner is ready to accept transport resumption immediately.
20    #[default]
21    Idle,
22    /// Model is currently generating a response; tearing down the connection would corrupt context.
23    Generating,
24    /// A tool is currently executing; teardown would cause tool loss.
25    ExecutingTool,
26    /// A context mutation was queued while the runner was busy, and must be executed once Idle.
27    ///
28    /// **Provider Context:** This state is only utilized by providers that do *not* support
29    /// native mid-flight mutability (e.g., Gemini Live), requiring a physical transport teardown
30    /// and rebuild (Phantom Reconnect). Providers like OpenAI natively apply `session.update`
31    /// frames instantly and will never enter this queued state.
32    ///
33    /// **Queue Policy:** The runner keeps only one pending resumption. If a new session update
34    /// arrives while a resumption is already pending, the previous pending resumption is replaced.
35    /// This is intentional: pending session updates represent desired end state, not an ordered
36    /// command queue. The policy is last write wins.
37    PendingResumption {
38        /// The new configuration to apply on reconnection.
39        config: Box<crate::config::RealtimeConfig>,
40        /// An optional message to inject immediately after resumption.
41        bridge_message: Option<String>,
42        /// Number of failed reconnection attempts for this mutation.
43        attempts: u8,
44    },
45}
46
47/// Handler for tool/function calls from the realtime model.
48#[async_trait]
49pub trait ToolHandler: Send + Sync {
50    /// Execute a tool call and return the result.
51    async fn execute(&self, call: &ToolCall) -> Result<serde_json::Value>;
52}
53
54/// A simple function-based tool handler.
55pub struct FnToolHandler<F>
56where
57    F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
58{
59    handler: F,
60}
61
62impl<F> FnToolHandler<F>
63where
64    F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
65{
66    /// Create a new function-based tool handler.
67    pub fn new(handler: F) -> Self {
68        Self { handler }
69    }
70}
71
72#[async_trait]
73impl<F> ToolHandler for FnToolHandler<F>
74where
75    F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
76{
77    async fn execute(&self, call: &ToolCall) -> Result<serde_json::Value> {
78        (self.handler)(call)
79    }
80}
81
82/// Async function-based tool handler.
83#[allow(dead_code)]
84pub struct AsyncToolHandler<F, Fut>
85where
86    F: Fn(ToolCall) -> Fut + Send + Sync,
87    Fut: std::future::Future<Output = Result<serde_json::Value>> + Send,
88{
89    handler: F,
90}
91
92impl<F, Fut> AsyncToolHandler<F, Fut>
93where
94    F: Fn(ToolCall) -> Fut + Send + Sync,
95    Fut: std::future::Future<Output = Result<serde_json::Value>> + Send,
96{
97    /// Create a new async tool handler.
98    pub fn new(handler: F) -> Self {
99        Self { handler }
100    }
101}
102
103/// Event handler for processing realtime events.
104#[async_trait]
105pub trait EventHandler: Send + Sync {
106    /// Called when an audio delta is received (raw PCM bytes).
107    async fn on_audio(&self, _audio: &[u8], _item_id: &str) -> Result<()> {
108        Ok(())
109    }
110
111    /// Called when a text delta is received.
112    async fn on_text(&self, _text: &str, _item_id: &str) -> Result<()> {
113        Ok(())
114    }
115
116    /// Called when a transcript delta is received.
117    async fn on_transcript(&self, _transcript: &str, _item_id: &str) -> Result<()> {
118        Ok(())
119    }
120
121    /// Called when speech is detected.
122    async fn on_speech_started(&self, _audio_start_ms: u64) -> Result<()> {
123        Ok(())
124    }
125
126    /// Called when speech ends.
127    async fn on_speech_stopped(&self, _audio_end_ms: u64) -> Result<()> {
128        Ok(())
129    }
130
131    /// Called when a response completes.
132    async fn on_response_done(&self) -> Result<()> {
133        Ok(())
134    }
135
136    /// Called on any error.
137    async fn on_error(&self, _error: &RealtimeError) -> Result<()> {
138        Ok(())
139    }
140}
141
142/// Default no-op event handler.
143#[derive(Debug, Clone, Default)]
144pub struct NoOpEventHandler;
145
146#[async_trait]
147impl EventHandler for NoOpEventHandler {}
148
149/// Configuration for the RealtimeRunner.
150#[derive(Clone)]
151pub struct RunnerConfig {
152    /// Whether to automatically execute tool calls.
153    pub auto_execute_tools: bool,
154    /// Whether to automatically send tool responses.
155    pub auto_respond_tools: bool,
156    /// Maximum concurrent tool executions.
157    pub max_concurrent_tools: usize,
158}
159
160impl Default for RunnerConfig {
161    fn default() -> Self {
162        Self { auto_execute_tools: true, auto_respond_tools: true, max_concurrent_tools: 4 }
163    }
164}
165
166/// Builder for RealtimeRunner.
167pub struct RealtimeRunnerBuilder {
168    model: Option<BoxedModel>,
169    config: RealtimeConfig,
170    runner_config: RunnerConfig,
171    tools: HashMap<String, (ToolDefinition, Arc<dyn ToolHandler>)>,
172    event_handler: Option<Arc<dyn EventHandler>>,
173}
174
175impl Default for RealtimeRunnerBuilder {
176    fn default() -> Self {
177        Self::new()
178    }
179}
180
181impl RealtimeRunnerBuilder {
182    /// Create a new builder.
183    pub fn new() -> Self {
184        Self {
185            model: None,
186            config: RealtimeConfig::default(),
187            runner_config: RunnerConfig::default(),
188            tools: HashMap::new(),
189            event_handler: None,
190        }
191    }
192
193    /// Set the realtime model.
194    pub fn model(mut self, model: BoxedModel) -> Self {
195        self.model = Some(model);
196        self
197    }
198
199    /// Set the session configuration.
200    pub fn config(mut self, config: RealtimeConfig) -> Self {
201        self.config = config;
202        self
203    }
204
205    /// Set the runner configuration.
206    pub fn runner_config(mut self, config: RunnerConfig) -> Self {
207        self.runner_config = config;
208        self
209    }
210
211    /// Set the system instruction.
212    pub fn instruction(mut self, instruction: impl Into<String>) -> Self {
213        self.config.instruction = Some(instruction.into());
214        self
215    }
216
217    /// Set the voice.
218    pub fn voice(mut self, voice: impl Into<String>) -> Self {
219        self.config.voice = Some(voice.into());
220        self
221    }
222
223    /// Register a tool with its handler.
224    pub fn tool(mut self, definition: ToolDefinition, handler: impl ToolHandler + 'static) -> Self {
225        let name = definition.name.clone();
226        self.tools.insert(name, (definition, Arc::new(handler)));
227        self
228    }
229
230    /// Register a tool with a sync function handler.
231    pub fn tool_fn<F>(self, definition: ToolDefinition, handler: F) -> Self
232    where
233        F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync + 'static,
234    {
235        self.tool(definition, FnToolHandler::new(handler))
236    }
237
238    /// Set the event handler.
239    pub fn event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
240        self.event_handler = Some(Arc::new(handler));
241        self
242    }
243
244    /// Build the runner (does not connect yet).
245    pub fn build(self) -> Result<RealtimeRunner> {
246        let model = self.model.ok_or_else(|| RealtimeError::config("Model is required"))?;
247
248        // Add tool definitions to config
249        let mut config = self.config;
250        if !self.tools.is_empty() {
251            let tool_defs: Vec<ToolDefinition> =
252                self.tools.values().map(|(def, _)| def.clone()).collect();
253            config.tools = Some(tool_defs);
254        }
255
256        Ok(RealtimeRunner {
257            model,
258            config: Arc::new(RwLock::new(config)),
259            runner_config: self.runner_config,
260            tools: self.tools,
261            event_handler: self.event_handler.unwrap_or_else(|| Arc::new(NoOpEventHandler)),
262            session: Arc::new(RwLock::new(None)),
263            state: Arc::new(RwLock::new(RunnerState::Idle)),
264        })
265    }
266}
267
268/// A runner that manages a realtime session with tool execution.
269///
270/// RealtimeRunner provides a high-level interface for:
271/// - Connecting to realtime providers
272/// - Automatically executing tool calls
273/// - Routing events to handlers
274/// - Managing the session lifecycle
275///
276/// # Example
277///
278/// ```rust,ignore
279/// use adk_realtime::{RealtimeRunner, RealtimeConfig, ToolDefinition};
280/// use adk_realtime::openai::OpenAIRealtimeModel;
281///
282/// #[tokio::main]
283/// async fn main() -> Result<()> {
284///     let model = OpenAIRealtimeModel::new(api_key, "gpt-4o-realtime-preview-2024-12-17");
285///
286///     let runner = RealtimeRunner::builder()
287///         .model(Box::new(model))
288///         .instruction("You are a helpful voice assistant.")
289///         .voice("alloy")
290///         .tool_fn(
291///             ToolDefinition::new("get_weather")
292///                 .with_description("Get weather for a location"),
293///             |call| {
294///                 Ok(serde_json::json!({"temperature": 72, "condition": "sunny"}))
295///             }
296///         )
297///         .build()?;
298///
299///     runner.connect().await?;
300///     runner.run().await?;
301///
302///     Ok(())
303/// }
304/// ```
305pub struct RealtimeRunner {
306    model: BoxedModel,
307    config: Arc<RwLock<RealtimeConfig>>,
308    runner_config: RunnerConfig,
309    tools: HashMap<String, (ToolDefinition, Arc<dyn ToolHandler>)>,
310    event_handler: Arc<dyn EventHandler>,
311    session: Arc<RwLock<Option<BoxedSession>>>,
312    state: Arc<RwLock<RunnerState>>,
313}
314
315impl RealtimeRunner {
316    /// Create a new builder.
317    pub fn builder() -> RealtimeRunnerBuilder {
318        RealtimeRunnerBuilder::new()
319    }
320
321    /// Connect to the realtime provider.
322    pub async fn connect(&self) -> Result<()> {
323        let config = self.config.read().await.clone();
324        let session = self.model.connect(config).await?;
325        let mut guard = self.session.write().await;
326        *guard = Some(session);
327        Ok(())
328    }
329
330    /// Check if currently connected.
331    pub async fn is_connected(&self) -> bool {
332        let guard = self.session.read().await;
333        guard.as_ref().map(|s| s.is_connected()).unwrap_or(false)
334    }
335
336    /// Get the session ID if connected.
337    pub async fn session_id(&self) -> Option<String> {
338        let guard = self.session.read().await;
339        guard.as_ref().map(|s| s.session_id().to_string())
340    }
341
342    /// Send a client event directly to the session.
343    ///
344    /// This method intercepts internal control-plane events (like `UpdateSession`) to route
345    /// them through the provider-agnostic orchestration layer instead of forwarding raw JSON
346    /// to the underlying WebSocket transport. This guarantees that `adk-realtime` never leaks
347    /// invalid event payloads to providers (e.g., OpenAI or Gemini) and universally bridges
348    /// the Cognitive Handoff mechanics transparently.
349    pub async fn send_client_event(&self, event: crate::events::ClientEvent) -> Result<()> {
350        match event {
351            crate::events::ClientEvent::UpdateSession { instructions, tools } => {
352                let update_config = SessionUpdateConfig(crate::config::RealtimeConfig {
353                    instruction: instructions,
354                    tools,
355                    ..Default::default()
356                });
357                self.update_session(update_config).await
358            }
359            other => {
360                let guard = self.session.read().await;
361                let session =
362                    guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
363                session.send_event(other).await
364            }
365        }
366    }
367
368    /// Internal helper to merge a `SessionUpdateConfig` delta into the canonical `RealtimeConfig` state.
369    ///
370    /// **Why this exists**: The `RealtimeRunner` must maintain an absolute, single source of truth
371    /// for its configuration (`self.config`). Orchestrators fire `SessionUpdateConfig`s as sparse
372    /// partial deltas (intents to hot-swap instructions or tools mid-flight). By accumulating
373    /// these sparse updates into the single `base` config, any subsequent "Phantom Reconnect"
374    /// (e.g., due to a Gemini domain shift or an unexpected network drop) natively inherits all
375    /// prior hot-swaps alongside the immutable transport parameters (like sample rates) defined at startup.
376    ///
377    /// Note: This is intentionally narrow and specifically scoped to merge only
378    /// hot-swappable cognitive fields (instruction, tools, voice, temperature, extra).
379    /// Transport-level attributes like sample rates and audio formats are not dynamically swappable.
380    fn merge_config(base: &mut RealtimeConfig, update: &SessionUpdateConfig) {
381        if let Some(instruction) = &update.0.instruction {
382            base.instruction = Some(instruction.clone());
383        }
384        if let Some(tools) = &update.0.tools {
385            base.tools = Some(tools.clone());
386        }
387        if let Some(voice) = &update.0.voice {
388            base.voice = Some(voice.clone());
389        }
390        if let Some(temp) = update.0.temperature {
391            base.temperature = Some(temp);
392        }
393        if let Some(extra) = &update.0.extra {
394            base.extra = Some(extra.clone());
395        }
396    }
397
398    /// Update the session configuration.
399    ///
400    /// Delegates to [`Self::update_session_with_bridge`] with no bridge message.
401    ///
402    /// # Example
403    ///
404    /// ```rust,ignore
405    /// use adk_realtime::config::{SessionUpdateConfig, RealtimeConfig};
406    ///
407    /// async fn example(runner: &adk_realtime::RealtimeRunner) {
408    ///     let update = SessionUpdateConfig(
409    ///         RealtimeConfig::default().with_instruction("You are now a pirate.")
410    ///     );
411    ///     runner.update_session(update).await.unwrap();
412    /// }
413    /// ```
414    pub async fn update_session(&self, config: SessionUpdateConfig) -> Result<()> {
415        self.update_session_with_bridge(config, None).await
416    }
417
418    /// Update the session configuration, optionally injecting a bridge message if
419    /// a transport resumption (Phantom Reconnect) occurs.
420    ///
421    /// The RealtimeRunner will attempt to mutate the session natively if the underlying
422    /// API supports it (e.g., OpenAI). If it does not (e.g., Gemini), the Runner will
423    /// queue a transport resumption, executing it only when the session
424    /// is in a resumable state (Idle) to prevent data corruption.
425    ///
426    /// The runner keeps only one pending resumption. If a new session update arrives while
427    /// a resumption is already pending, the previous pending resumption is replaced. This is
428    /// intentional: pending session updates represent desired end state, not an ordered command queue.
429    /// The policy is last write wins.
430    pub async fn update_session_with_bridge(
431        &self,
432        config: SessionUpdateConfig,
433        bridge_message: Option<String>,
434    ) -> Result<()> {
435        // 1. Merge the incoming delta into the runner's canonical, persisted configuration.
436        // This ensures that any future reconnects (e.g., due to network drops) naturally
437        // inherit this latest state.
438        let mut full_config = self.config.write().await;
439        Self::merge_config(&mut full_config, &config);
440
441        let cloned_config = full_config.clone();
442        drop(full_config); // Free the write lock early to avoid deadlocks.
443
444        // 2. Obtain a read lock on the active session transport to attempt the mutation.
445        let guard = self.session.read().await;
446        let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
447
448        // 3. Delegate the mutation attempt to the provider-specific adapter.
449        match session.mutate_context(cloned_config).await? {
450            // PATH A: Native Mutability (e.g., OpenAI)
451            // The provider natively updated the context over the active WebSocket.
452            ContextMutationOutcome::Applied => {
453                tracing::info!("Context mutated natively mid-flight.");
454
455                // Since the transport wasn't dropped, we can inject the bridge message
456                // immediately as a standard user message to update the model's short-term memory.
457                if let Some(msg) = bridge_message {
458                    let event = crate::events::ClientEvent::Message {
459                        role: "user".to_string(),
460                        parts: vec![adk_core::types::Part::Text { text: msg }],
461                    };
462                    session.send_event(event).await?;
463                }
464                Ok(())
465            }
466
467            // PATH B: Rigid Initialization (e.g., Gemini)
468            // The provider requires us to tear down the WebSocket and establish a new one (Phantom Reconnect).
469            ContextMutationOutcome::RequiresResumption(new_config) => {
470                drop(guard); // CRITICAL: Release the read lock on the session before we attempt to mutate it or acquire state locks.
471
472                // 4. Check the Runner's internal state machine to ensure it is safe to tear down the socket.
473                let mut state_guard = self.state.write().await;
474
475                if *state_guard == RunnerState::Idle {
476                    // Safe to reconnect: The model is neither generating audio nor executing a tool.
477                    drop(state_guard); // Free state lock before the heavy async network operation.
478                    tracing::info!("Runner is idle. Executing resumption immediately.");
479
480                    if let Err(e) =
481                        self.execute_resumption((*new_config).clone(), bridge_message.clone()).await
482                    {
483                        tracing::error!("Immediate resumption failed: {}. Queueing for retry.", e);
484                        // If the reconnect fails (e.g., transient network issue), we must not lose the mutation intent.
485                        // We push it back into the queue for the background loop to retry.
486                        let mut fallback_state = self.state.write().await;
487                        *fallback_state = RunnerState::PendingResumption {
488                            config: Box::new(*new_config),
489                            bridge_message,
490                            attempts: 1,
491                        };
492                        return Err(e);
493                    }
494                } else {
495                    // Unsafe to reconnect: Tearing down the socket now would corrupt the in-flight context.
496                    // We must queue the mutation. The event loop will execute it once it returns to Idle.
497                    if let RunnerState::PendingResumption { .. } = *state_guard {
498                        tracing::warn!(
499                            "Runner already had a pending resumption. Overwriting with last-write-wins policy."
500                        );
501                    } else {
502                        tracing::info!("Runner is busy ({:?}). Queueing resumption.", *state_guard);
503                    }
504
505                    // Queue the intent using a last-write-wins policy.
506                    *state_guard = RunnerState::PendingResumption {
507                        config: new_config,
508                        bridge_message,
509                        attempts: 0,
510                    };
511                }
512                Ok(())
513            }
514        }
515    }
516
517    /// Internal helper to execute a transport resumption (teardown and rebuild).
518    async fn execute_resumption(
519        &self,
520        new_config: crate::config::RealtimeConfig,
521        bridge_message: Option<String>,
522    ) -> Result<()> {
523        tracing::warn!("Executing transport resumption with new configuration.");
524
525        // 1. Acquire exclusive write access to the session pointer.
526        let mut write_guard = self.session.write().await;
527
528        // 2. Explicitly tear down the old WebSocket connection to release upstream resources.
529        if let Some(old_session) = write_guard.as_ref() {
530            if let Err(e) = old_session.close().await {
531                tracing::warn!("Failed to cleanly close old session during resumption: {}", e);
532            }
533        }
534
535        // 3. Establish a brand new connection using the provider-agnostic factory interface.
536        // If the provider supports resumption natively (like Gemini), the `new_config`
537        // payload already contains the cached `resumeToken`.
538        let new_session = self.model.connect(new_config).await?;
539
540        // 4. Overwrite the active session pointer with the newly connected transport.
541        *write_guard = Some(new_session);
542
543        // 5. Release the write lock immediately before attempting to inject any new messages.
544        drop(write_guard);
545
546        // 6. If the orchestrator provided a bridge message (e.g. to explain the domain shift),
547        // safely inject it into the new connection's context window.
548        if let Some(msg) = bridge_message {
549            self.inject_bridge_message(msg).await?;
550        }
551
552        tracing::info!("Resumption complete. New transport established.");
553        Ok(())
554    }
555
556    /// Internal helper to safely inject a bridge message directly into the active session.
557    ///
558    /// This intentionally bypasses the `send_client_event` router to avoid `E0733`
559    /// (un-Boxed async recursion) where `send_client_event` -> `update_session` ->
560    /// `execute_resumption` -> `send_client_event` creates an infinite compiler loop.
561    async fn inject_bridge_message(&self, msg: String) -> Result<()> {
562        tracing::info!("Injecting bridge message post-resumption.");
563        let event = crate::events::ClientEvent::Message {
564            role: "user".to_string(),
565            parts: vec![adk_core::types::Part::Text { text: msg }],
566        };
567        let guard = self.session.read().await;
568        let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
569        session.send_event(event).await
570    }
571
572    /// Send audio to the session.
573    pub async fn send_audio(&self, audio_base64: &str) -> Result<()> {
574        let guard = self.session.read().await;
575        let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
576        session.send_audio_base64(audio_base64).await
577    }
578
579    /// Send text to the session.
580    pub async fn send_text(&self, text: &str) -> Result<()> {
581        let guard = self.session.read().await;
582        let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
583        session.send_text(text).await
584    }
585
586    /// Commit the audio buffer (for manual VAD mode).
587    pub async fn commit_audio(&self) -> Result<()> {
588        let guard = self.session.read().await;
589        let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
590        session.commit_audio().await
591    }
592
593    /// Trigger a response from the model.
594    pub async fn create_response(&self) -> Result<()> {
595        let guard = self.session.read().await;
596        let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
597        session.create_response().await
598    }
599
600    /// Interrupt the current response.
601    pub async fn interrupt(&self) -> Result<()> {
602        let guard = self.session.read().await;
603        let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
604        session.interrupt().await
605    }
606
607    /// Get the next raw event from the session.
608    ///
609    /// # Example
610    ///
611    /// ```rust,ignore
612    /// use adk_realtime::events::ServerEvent;
613    /// use tracing::{info, error};
614    ///
615    /// async fn process_events(runner: &adk_realtime::RealtimeRunner) {
616    ///     while let Some(event) = runner.next_event().await {
617    ///         match event {
618    ///             Ok(ServerEvent::SpeechStarted { .. }) => info!("User is speaking"),
619    ///             Ok(_) => info!("Received other event"),
620    ///             Err(e) => error!("Error: {e}"),
621    ///         }
622    ///     }
623    /// }
624    /// ```
625    pub async fn next_event(&self) -> Option<Result<ServerEvent>> {
626        let guard = self.session.read().await;
627        if let Some(session) = guard.as_ref() {
628            // Some sessions might yield inside next_event, but just in case, yield here too
629            tokio::task::yield_now().await;
630            session.next_event().await
631        } else {
632            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
633            None
634        }
635    }
636
637    /// Send a tool response to the session.
638    ///
639    /// # Example
640    ///
641    /// ```rust,ignore
642    /// use adk_realtime::events::ToolResponse;
643    /// use serde_json::json;
644    ///
645    /// async fn example(runner: &adk_realtime::RealtimeRunner) {
646    ///     let response = ToolResponse {
647    ///         call_id: "call_123".to_string(),
648    ///         output: json!({"temperature": 72}),
649    ///     };
650    ///     runner.send_tool_response(response).await.unwrap();
651    /// }
652    /// ```
653    pub async fn send_tool_response(&self, response: ToolResponse) -> Result<()> {
654        let guard = self.session.read().await;
655        let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
656        session.send_tool_response(response).await
657    }
658
659    /// Run the event loop, processing events until disconnected.
660    pub async fn run(&self) -> Result<()> {
661        loop {
662            let event = {
663                let guard = self.session.read().await;
664                let session =
665                    guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
666                session.next_event().await
667            };
668
669            match event {
670                Some(Ok(event)) => {
671                    self.handle_event(event).await?;
672                }
673                Some(Err(e)) => {
674                    self.event_handler.on_error(&e).await?;
675                    return Err(e);
676                }
677                None => {
678                    // Session closed
679                    break;
680                }
681            }
682        }
683        Ok(())
684    }
685
686    /// Process a single event.
687    async fn handle_event(&self, event: ServerEvent) -> Result<()> {
688        // Track state transitions before forwarding the event
689        match &event {
690            ServerEvent::ResponseCreated { .. } => {
691                let mut state = self.state.write().await;
692                if let RunnerState::Idle = *state {
693                    *state = RunnerState::Generating;
694                }
695            }
696            ServerEvent::FunctionCallDone { .. } => {
697                let mut state = self.state.write().await;
698                if let RunnerState::Generating | RunnerState::Idle = *state {
699                    *state = RunnerState::ExecutingTool;
700                }
701            }
702            _ => {}
703        }
704
705        match event {
706            ServerEvent::AudioDelta { delta, item_id, .. } => {
707                self.event_handler.on_audio(&delta, &item_id).await?;
708            }
709            ServerEvent::TextDelta { delta, item_id, .. } => {
710                self.event_handler.on_text(&delta, &item_id).await?;
711            }
712            ServerEvent::TranscriptDelta { delta, item_id, .. } => {
713                self.event_handler.on_transcript(&delta, &item_id).await?;
714            }
715            ServerEvent::SpeechStarted { audio_start_ms, .. } => {
716                self.event_handler.on_speech_started(audio_start_ms).await?;
717            }
718            ServerEvent::SpeechStopped { audio_end_ms, .. } => {
719                self.event_handler.on_speech_stopped(audio_end_ms).await?;
720            }
721            ServerEvent::ResponseDone { .. } => {
722                self.event_handler.on_response_done().await?;
723                self.check_resumption_queue().await?;
724            }
725            ServerEvent::FunctionCallDone { call_id, name, arguments, .. } => {
726                if self.runner_config.auto_execute_tools {
727                    self.execute_tool_call(&call_id, &name, &arguments).await?;
728                }
729            }
730            ServerEvent::SessionUpdated { session, .. } => {
731                // Check if the generic session update contains a resumption token
732                if let Some(token) = session.get("resumeToken").and_then(|t| t.as_str()) {
733                    tracing::info!(
734                        "Received Gemini sessionResumption token, saving for future reconnects."
735                    );
736                    let mut config = self.config.write().await;
737                    let mut extra = config.extra.clone().unwrap_or_else(|| serde_json::json!({}));
738                    extra["resumeToken"] = serde_json::Value::String(token.to_string());
739                    config.extra = Some(extra);
740                }
741            }
742            ServerEvent::Error { error, .. } => {
743                let err = RealtimeError::server(error.code.unwrap_or_default(), error.message);
744                self.event_handler.on_error(&err).await?;
745            }
746            _ => {
747                // Ignore other events
748            }
749        }
750        Ok(())
751    }
752
753    /// Safely transitions the runner back to Idle and executes any queued resumptions.
754    async fn check_resumption_queue(&self) -> Result<()> {
755        // 1. Acquire the state lock to inspect the queue.
756        let mut state = self.state.write().await;
757
758        // 2. Extract the pending configuration and attempt count if one exists.
759        let pending =
760            if let RunnerState::PendingResumption { config, bridge_message, attempts } = &*state {
761                Some((config.clone(), bridge_message.clone(), *attempts))
762            } else {
763                None
764            };
765
766        if let Some((config, bridge_message, attempts)) = pending {
767            tracing::info!(
768                "Executing queued resumption after turn completion. (Attempt {})",
769                attempts + 1
770            );
771
772            // 3. Mark the state as Idle so the background loop is unblocked.
773            *state = RunnerState::Idle;
774
775            // 4. Release the state lock *before* performing the heavy async socket connection.
776            drop(state);
777
778            // 5. Attempt the actual transport teardown/rebuild.
779            if let Err(e) = self.execute_resumption((*config).clone(), bridge_message.clone()).await
780            {
781                tracing::error!("Resumption failed: {}.", e);
782
783                // 6. If the reconnect fails (e.g., transient network error), re-acquire the lock
784                // to safely handle the retry logic without crashing the event loop.
785                let mut fallback_state = self.state.write().await;
786
787                // 7. Enforce a maximum retry budget to prevent infinite "hot-looping"
788                if attempts + 1 >= 3 {
789                    tracing::error!(
790                        "Maximum resumption attempts reached (3). Dropping queued mutation to prevent infinite loop."
791                    );
792                    *fallback_state = RunnerState::Idle;
793                } else {
794                    tracing::info!("Restoring pending queue state for retry.");
795                    *fallback_state = RunnerState::PendingResumption {
796                        config,
797                        bridge_message,
798                        attempts: attempts + 1,
799                    };
800                }
801
802                // 8. Do not return Err(e) here, as that would permanently kill the `run()` loop.
803                // Instead, report the error to the downstream handler and allow the event loop to continue spinning.
804                let _ = self.event_handler.on_error(&e).await;
805            }
806        } else {
807            // No resumptions were queued; simply mark as Idle.
808            *state = RunnerState::Idle;
809        }
810        Ok(())
811    }
812
813    /// Execute a tool call and optionally send the response.
814    async fn execute_tool_call(&self, call_id: &str, name: &str, arguments: &str) -> Result<()> {
815        let handler = self.tools.get(name).map(|(_, h)| h.clone());
816
817        let result = if let Some(handler) = handler {
818            let args: serde_json::Value = serde_json::from_str(arguments)
819                .unwrap_or(serde_json::Value::Object(Default::default()));
820
821            let call =
822                ToolCall { call_id: call_id.to_string(), name: name.to_string(), arguments: args };
823
824            match handler.execute(&call).await {
825                Ok(value) => value,
826                Err(e) => serde_json::json!({
827                    "error": e.to_string()
828                }),
829            }
830        } else {
831            serde_json::json!({
832                "error": format!("Unknown tool: {}", name)
833            })
834        };
835
836        if self.runner_config.auto_respond_tools {
837            let response = ToolResponse { call_id: call_id.to_string(), output: result };
838
839            let guard = self.session.read().await;
840            if let Some(session) = guard.as_ref() {
841                session.send_tool_response(response).await?;
842            }
843        }
844
845        Ok(())
846    }
847
848    /// Close the session.
849    pub async fn close(&self) -> Result<()> {
850        let guard = self.session.read().await;
851        if let Some(session) = guard.as_ref() {
852            session.close().await?;
853        }
854        Ok(())
855    }
856}