adk_realtime/runner.rs
1//! RealtimeRunner for integrating realtime sessions with agents.
2//!
3//! This module provides the bridge between realtime audio sessions and
4//! the ADK agent framework, handling tool execution and event routing.
5
6use crate::config::{RealtimeConfig, SessionUpdateConfig, ToolDefinition};
7use crate::error::{RealtimeError, Result};
8use crate::events::{ServerEvent, ToolCall, ToolResponse};
9use crate::model::BoxedModel;
10use crate::session::{BoxedSession, ContextMutationOutcome};
11use async_trait::async_trait;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16/// Internal state machine tracking the resumability status of the RealtimeRunner.
17#[derive(Debug, Clone, Default, PartialEq)]
18pub enum RunnerState {
19 /// Runner is ready to accept transport resumption immediately.
20 #[default]
21 Idle,
22 /// Model is currently generating a response; tearing down the connection would corrupt context.
23 Generating,
24 /// A tool is currently executing; teardown would cause tool loss.
25 ExecutingTool,
26 /// A context mutation was queued while the runner was busy, and must be executed once Idle.
27 ///
28 /// **Provider Context:** This state is only utilized by providers that do *not* support
29 /// native mid-flight mutability (e.g., Gemini Live), requiring a physical transport teardown
30 /// and rebuild (Phantom Reconnect). Providers like OpenAI natively apply `session.update`
31 /// frames instantly and will never enter this queued state.
32 ///
33 /// **Queue Policy:** The runner keeps only one pending resumption. If a new session update
34 /// arrives while a resumption is already pending, the previous pending resumption is replaced.
35 /// This is intentional: pending session updates represent desired end state, not an ordered
36 /// command queue. The policy is last write wins.
37 PendingResumption {
38 /// The new configuration to apply on reconnection.
39 config: Box<crate::config::RealtimeConfig>,
40 /// An optional message to inject immediately after resumption.
41 bridge_message: Option<String>,
42 /// Number of failed reconnection attempts for this mutation.
43 attempts: u8,
44 },
45}
46
47/// Handler for tool/function calls from the realtime model.
48#[async_trait]
49pub trait ToolHandler: Send + Sync {
50 /// Execute a tool call and return the result.
51 async fn execute(&self, call: &ToolCall) -> Result<serde_json::Value>;
52}
53
54/// A simple function-based tool handler.
55pub struct FnToolHandler<F>
56where
57 F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
58{
59 handler: F,
60}
61
62impl<F> FnToolHandler<F>
63where
64 F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
65{
66 /// Create a new function-based tool handler.
67 pub fn new(handler: F) -> Self {
68 Self { handler }
69 }
70}
71
72#[async_trait]
73impl<F> ToolHandler for FnToolHandler<F>
74where
75 F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync,
76{
77 async fn execute(&self, call: &ToolCall) -> Result<serde_json::Value> {
78 (self.handler)(call)
79 }
80}
81
82/// Async function-based tool handler.
83#[allow(dead_code)]
84pub struct AsyncToolHandler<F, Fut>
85where
86 F: Fn(ToolCall) -> Fut + Send + Sync,
87 Fut: std::future::Future<Output = Result<serde_json::Value>> + Send,
88{
89 handler: F,
90}
91
92impl<F, Fut> AsyncToolHandler<F, Fut>
93where
94 F: Fn(ToolCall) -> Fut + Send + Sync,
95 Fut: std::future::Future<Output = Result<serde_json::Value>> + Send,
96{
97 /// Create a new async tool handler.
98 pub fn new(handler: F) -> Self {
99 Self { handler }
100 }
101}
102
103/// Event handler for processing realtime events.
104#[async_trait]
105pub trait EventHandler: Send + Sync {
106 /// Called when an audio delta is received (raw PCM bytes).
107 async fn on_audio(&self, _audio: &[u8], _item_id: &str) -> Result<()> {
108 Ok(())
109 }
110
111 /// Called when a text delta is received.
112 async fn on_text(&self, _text: &str, _item_id: &str) -> Result<()> {
113 Ok(())
114 }
115
116 /// Called when a transcript delta is received.
117 async fn on_transcript(&self, _transcript: &str, _item_id: &str) -> Result<()> {
118 Ok(())
119 }
120
121 /// Called when speech is detected.
122 async fn on_speech_started(&self, _audio_start_ms: u64) -> Result<()> {
123 Ok(())
124 }
125
126 /// Called when speech ends.
127 async fn on_speech_stopped(&self, _audio_end_ms: u64) -> Result<()> {
128 Ok(())
129 }
130
131 /// Called when a response completes.
132 async fn on_response_done(&self) -> Result<()> {
133 Ok(())
134 }
135
136 /// Called on any error.
137 async fn on_error(&self, _error: &RealtimeError) -> Result<()> {
138 Ok(())
139 }
140}
141
142/// Default no-op event handler.
143#[derive(Debug, Clone, Default)]
144pub struct NoOpEventHandler;
145
146#[async_trait]
147impl EventHandler for NoOpEventHandler {}
148
149/// Configuration for the RealtimeRunner.
150#[derive(Clone)]
151pub struct RunnerConfig {
152 /// Whether to automatically execute tool calls.
153 pub auto_execute_tools: bool,
154 /// Whether to automatically send tool responses.
155 pub auto_respond_tools: bool,
156 /// Maximum concurrent tool executions.
157 pub max_concurrent_tools: usize,
158}
159
160impl Default for RunnerConfig {
161 fn default() -> Self {
162 Self { auto_execute_tools: true, auto_respond_tools: true, max_concurrent_tools: 4 }
163 }
164}
165
166/// Builder for RealtimeRunner.
167pub struct RealtimeRunnerBuilder {
168 model: Option<BoxedModel>,
169 config: RealtimeConfig,
170 runner_config: RunnerConfig,
171 tools: HashMap<String, (ToolDefinition, Arc<dyn ToolHandler>)>,
172 event_handler: Option<Arc<dyn EventHandler>>,
173}
174
175impl Default for RealtimeRunnerBuilder {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181impl RealtimeRunnerBuilder {
182 /// Create a new builder.
183 pub fn new() -> Self {
184 Self {
185 model: None,
186 config: RealtimeConfig::default(),
187 runner_config: RunnerConfig::default(),
188 tools: HashMap::new(),
189 event_handler: None,
190 }
191 }
192
193 /// Set the realtime model.
194 pub fn model(mut self, model: BoxedModel) -> Self {
195 self.model = Some(model);
196 self
197 }
198
199 /// Set the session configuration.
200 pub fn config(mut self, config: RealtimeConfig) -> Self {
201 self.config = config;
202 self
203 }
204
205 /// Set the runner configuration.
206 pub fn runner_config(mut self, config: RunnerConfig) -> Self {
207 self.runner_config = config;
208 self
209 }
210
211 /// Set the system instruction.
212 pub fn instruction(mut self, instruction: impl Into<String>) -> Self {
213 self.config.instruction = Some(instruction.into());
214 self
215 }
216
217 /// Set the voice.
218 pub fn voice(mut self, voice: impl Into<String>) -> Self {
219 self.config.voice = Some(voice.into());
220 self
221 }
222
223 /// Register a tool with its handler.
224 pub fn tool(mut self, definition: ToolDefinition, handler: impl ToolHandler + 'static) -> Self {
225 let name = definition.name.clone();
226 self.tools.insert(name, (definition, Arc::new(handler)));
227 self
228 }
229
230 /// Register a tool with a sync function handler.
231 pub fn tool_fn<F>(self, definition: ToolDefinition, handler: F) -> Self
232 where
233 F: Fn(&ToolCall) -> Result<serde_json::Value> + Send + Sync + 'static,
234 {
235 self.tool(definition, FnToolHandler::new(handler))
236 }
237
238 /// Set the event handler.
239 pub fn event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
240 self.event_handler = Some(Arc::new(handler));
241 self
242 }
243
244 /// Build the runner (does not connect yet).
245 pub fn build(self) -> Result<RealtimeRunner> {
246 let model = self.model.ok_or_else(|| RealtimeError::config("Model is required"))?;
247
248 // Add tool definitions to config
249 let mut config = self.config;
250 if !self.tools.is_empty() {
251 let tool_defs: Vec<ToolDefinition> =
252 self.tools.values().map(|(def, _)| def.clone()).collect();
253 config.tools = Some(tool_defs);
254 }
255
256 Ok(RealtimeRunner {
257 model,
258 config: Arc::new(RwLock::new(config)),
259 runner_config: self.runner_config,
260 tools: self.tools,
261 event_handler: self.event_handler.unwrap_or_else(|| Arc::new(NoOpEventHandler)),
262 session: Arc::new(RwLock::new(None)),
263 state: Arc::new(RwLock::new(RunnerState::Idle)),
264 })
265 }
266}
267
268/// A runner that manages a realtime session with tool execution.
269///
270/// RealtimeRunner provides a high-level interface for:
271/// - Connecting to realtime providers
272/// - Automatically executing tool calls
273/// - Routing events to handlers
274/// - Managing the session lifecycle
275///
276/// # Example
277///
278/// ```rust,ignore
279/// use adk_realtime::{RealtimeRunner, RealtimeConfig, ToolDefinition};
280/// use adk_realtime::openai::OpenAIRealtimeModel;
281///
282/// #[tokio::main]
283/// async fn main() -> Result<()> {
284/// let model = OpenAIRealtimeModel::new(api_key, "gpt-4o-realtime-preview-2024-12-17");
285///
286/// let runner = RealtimeRunner::builder()
287/// .model(Box::new(model))
288/// .instruction("You are a helpful voice assistant.")
289/// .voice("alloy")
290/// .tool_fn(
291/// ToolDefinition::new("get_weather")
292/// .with_description("Get weather for a location"),
293/// |call| {
294/// Ok(serde_json::json!({"temperature": 72, "condition": "sunny"}))
295/// }
296/// )
297/// .build()?;
298///
299/// runner.connect().await?;
300/// runner.run().await?;
301///
302/// Ok(())
303/// }
304/// ```
305pub struct RealtimeRunner {
306 model: BoxedModel,
307 config: Arc<RwLock<RealtimeConfig>>,
308 runner_config: RunnerConfig,
309 tools: HashMap<String, (ToolDefinition, Arc<dyn ToolHandler>)>,
310 event_handler: Arc<dyn EventHandler>,
311 session: Arc<RwLock<Option<BoxedSession>>>,
312 state: Arc<RwLock<RunnerState>>,
313}
314
315impl RealtimeRunner {
316 /// Create a new builder.
317 pub fn builder() -> RealtimeRunnerBuilder {
318 RealtimeRunnerBuilder::new()
319 }
320
321 /// Connect to the realtime provider.
322 pub async fn connect(&self) -> Result<()> {
323 let config = self.config.read().await.clone();
324 let session = self.model.connect(config).await?;
325 let mut guard = self.session.write().await;
326 *guard = Some(session);
327 Ok(())
328 }
329
330 /// Check if currently connected.
331 pub async fn is_connected(&self) -> bool {
332 let guard = self.session.read().await;
333 guard.as_ref().map(|s| s.is_connected()).unwrap_or(false)
334 }
335
336 /// Get the session ID if connected.
337 pub async fn session_id(&self) -> Option<String> {
338 let guard = self.session.read().await;
339 guard.as_ref().map(|s| s.session_id().to_string())
340 }
341
342 /// Send a client event directly to the session.
343 ///
344 /// This method intercepts internal control-plane events (like `UpdateSession`) to route
345 /// them through the provider-agnostic orchestration layer instead of forwarding raw JSON
346 /// to the underlying WebSocket transport. This guarantees that `adk-realtime` never leaks
347 /// invalid event payloads to providers (e.g., OpenAI or Gemini) and universally bridges
348 /// the Cognitive Handoff mechanics transparently.
349 pub async fn send_client_event(&self, event: crate::events::ClientEvent) -> Result<()> {
350 match event {
351 crate::events::ClientEvent::UpdateSession { instructions, tools } => {
352 let update_config = SessionUpdateConfig(crate::config::RealtimeConfig {
353 instruction: instructions,
354 tools,
355 ..Default::default()
356 });
357 self.update_session(update_config).await
358 }
359 other => {
360 let guard = self.session.read().await;
361 let session =
362 guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
363 session.send_event(other).await
364 }
365 }
366 }
367
368 /// Internal helper to merge a `SessionUpdateConfig` delta into the canonical `RealtimeConfig` state.
369 ///
370 /// **Why this exists**: The `RealtimeRunner` must maintain an absolute, single source of truth
371 /// for its configuration (`self.config`). Orchestrators fire `SessionUpdateConfig`s as sparse
372 /// partial deltas (intents to hot-swap instructions or tools mid-flight). By accumulating
373 /// these sparse updates into the single `base` config, any subsequent "Phantom Reconnect"
374 /// (e.g., due to a Gemini domain shift or an unexpected network drop) natively inherits all
375 /// prior hot-swaps alongside the immutable transport parameters (like sample rates) defined at startup.
376 ///
377 /// Note: This is intentionally narrow and specifically scoped to merge only
378 /// hot-swappable cognitive fields (instruction, tools, voice, temperature, extra).
379 /// Transport-level attributes like sample rates and audio formats are not dynamically swappable.
380 fn merge_config(base: &mut RealtimeConfig, update: &SessionUpdateConfig) {
381 if let Some(instruction) = &update.0.instruction {
382 base.instruction = Some(instruction.clone());
383 }
384 if let Some(tools) = &update.0.tools {
385 base.tools = Some(tools.clone());
386 }
387 if let Some(voice) = &update.0.voice {
388 base.voice = Some(voice.clone());
389 }
390 if let Some(temp) = update.0.temperature {
391 base.temperature = Some(temp);
392 }
393 if let Some(extra) = &update.0.extra {
394 base.extra = Some(extra.clone());
395 }
396 }
397
398 /// Update the session configuration.
399 ///
400 /// Delegates to [`Self::update_session_with_bridge`] with no bridge message.
401 ///
402 /// # Example
403 ///
404 /// ```rust,ignore
405 /// use adk_realtime::config::{SessionUpdateConfig, RealtimeConfig};
406 ///
407 /// async fn example(runner: &adk_realtime::RealtimeRunner) {
408 /// let update = SessionUpdateConfig(
409 /// RealtimeConfig::default().with_instruction("You are now a pirate.")
410 /// );
411 /// runner.update_session(update).await.unwrap();
412 /// }
413 /// ```
414 pub async fn update_session(&self, config: SessionUpdateConfig) -> Result<()> {
415 self.update_session_with_bridge(config, None).await
416 }
417
418 /// Update the session configuration, optionally injecting a bridge message if
419 /// a transport resumption (Phantom Reconnect) occurs.
420 ///
421 /// The RealtimeRunner will attempt to mutate the session natively if the underlying
422 /// API supports it (e.g., OpenAI). If it does not (e.g., Gemini), the Runner will
423 /// queue a transport resumption, executing it only when the session
424 /// is in a resumable state (Idle) to prevent data corruption.
425 ///
426 /// The runner keeps only one pending resumption. If a new session update arrives while
427 /// a resumption is already pending, the previous pending resumption is replaced. This is
428 /// intentional: pending session updates represent desired end state, not an ordered command queue.
429 /// The policy is last write wins.
430 pub async fn update_session_with_bridge(
431 &self,
432 config: SessionUpdateConfig,
433 bridge_message: Option<String>,
434 ) -> Result<()> {
435 // 1. Merge the incoming delta into the runner's canonical, persisted configuration.
436 // This ensures that any future reconnects (e.g., due to network drops) naturally
437 // inherit this latest state.
438 let mut full_config = self.config.write().await;
439 Self::merge_config(&mut full_config, &config);
440
441 let cloned_config = full_config.clone();
442 drop(full_config); // Free the write lock early to avoid deadlocks.
443
444 // 2. Obtain a read lock on the active session transport to attempt the mutation.
445 let guard = self.session.read().await;
446 let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
447
448 // 3. Delegate the mutation attempt to the provider-specific adapter.
449 match session.mutate_context(cloned_config).await? {
450 // PATH A: Native Mutability (e.g., OpenAI)
451 // The provider natively updated the context over the active WebSocket.
452 ContextMutationOutcome::Applied => {
453 tracing::info!("Context mutated natively mid-flight.");
454
455 // Since the transport wasn't dropped, we can inject the bridge message
456 // immediately as a standard user message to update the model's short-term memory.
457 if let Some(msg) = bridge_message {
458 let event = crate::events::ClientEvent::Message {
459 role: "user".to_string(),
460 parts: vec![adk_core::types::Part::Text { text: msg }],
461 };
462 session.send_event(event).await?;
463 }
464 Ok(())
465 }
466
467 // PATH B: Rigid Initialization (e.g., Gemini)
468 // The provider requires us to tear down the WebSocket and establish a new one (Phantom Reconnect).
469 ContextMutationOutcome::RequiresResumption(new_config) => {
470 drop(guard); // CRITICAL: Release the read lock on the session before we attempt to mutate it or acquire state locks.
471
472 // 4. Check the Runner's internal state machine to ensure it is safe to tear down the socket.
473 let mut state_guard = self.state.write().await;
474
475 if *state_guard == RunnerState::Idle {
476 // Safe to reconnect: The model is neither generating audio nor executing a tool.
477 drop(state_guard); // Free state lock before the heavy async network operation.
478 tracing::info!("Runner is idle. Executing resumption immediately.");
479
480 if let Err(e) =
481 self.execute_resumption((*new_config).clone(), bridge_message.clone()).await
482 {
483 tracing::error!("Immediate resumption failed: {}. Queueing for retry.", e);
484 // If the reconnect fails (e.g., transient network issue), we must not lose the mutation intent.
485 // We push it back into the queue for the background loop to retry.
486 let mut fallback_state = self.state.write().await;
487 *fallback_state = RunnerState::PendingResumption {
488 config: Box::new(*new_config),
489 bridge_message,
490 attempts: 1,
491 };
492 return Err(e);
493 }
494 } else {
495 // Unsafe to reconnect: Tearing down the socket now would corrupt the in-flight context.
496 // We must queue the mutation. The event loop will execute it once it returns to Idle.
497 if let RunnerState::PendingResumption { .. } = *state_guard {
498 tracing::warn!(
499 "Runner already had a pending resumption. Overwriting with last-write-wins policy."
500 );
501 } else {
502 tracing::info!("Runner is busy ({:?}). Queueing resumption.", *state_guard);
503 }
504
505 // Queue the intent using a last-write-wins policy.
506 *state_guard = RunnerState::PendingResumption {
507 config: new_config,
508 bridge_message,
509 attempts: 0,
510 };
511 }
512 Ok(())
513 }
514 }
515 }
516
517 /// Internal helper to execute a transport resumption (teardown and rebuild).
518 async fn execute_resumption(
519 &self,
520 new_config: crate::config::RealtimeConfig,
521 bridge_message: Option<String>,
522 ) -> Result<()> {
523 tracing::warn!("Executing transport resumption with new configuration.");
524
525 // 1. Acquire exclusive write access to the session pointer.
526 let mut write_guard = self.session.write().await;
527
528 // 2. Explicitly tear down the old WebSocket connection to release upstream resources.
529 if let Some(old_session) = write_guard.as_ref() {
530 if let Err(e) = old_session.close().await {
531 tracing::warn!("Failed to cleanly close old session during resumption: {}", e);
532 }
533 }
534
535 // 3. Establish a brand new connection using the provider-agnostic factory interface.
536 // If the provider supports resumption natively (like Gemini), the `new_config`
537 // payload already contains the cached `resumeToken`.
538 let new_session = self.model.connect(new_config).await?;
539
540 // 4. Overwrite the active session pointer with the newly connected transport.
541 *write_guard = Some(new_session);
542
543 // 5. Release the write lock immediately before attempting to inject any new messages.
544 drop(write_guard);
545
546 // 6. If the orchestrator provided a bridge message (e.g. to explain the domain shift),
547 // safely inject it into the new connection's context window.
548 if let Some(msg) = bridge_message {
549 self.inject_bridge_message(msg).await?;
550 }
551
552 tracing::info!("Resumption complete. New transport established.");
553 Ok(())
554 }
555
556 /// Internal helper to safely inject a bridge message directly into the active session.
557 ///
558 /// This intentionally bypasses the `send_client_event` router to avoid `E0733`
559 /// (un-Boxed async recursion) where `send_client_event` -> `update_session` ->
560 /// `execute_resumption` -> `send_client_event` creates an infinite compiler loop.
561 async fn inject_bridge_message(&self, msg: String) -> Result<()> {
562 tracing::info!("Injecting bridge message post-resumption.");
563 let event = crate::events::ClientEvent::Message {
564 role: "user".to_string(),
565 parts: vec![adk_core::types::Part::Text { text: msg }],
566 };
567 let guard = self.session.read().await;
568 let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
569 session.send_event(event).await
570 }
571
572 /// Send audio to the session.
573 pub async fn send_audio(&self, audio_base64: &str) -> Result<()> {
574 let guard = self.session.read().await;
575 let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
576 session.send_audio_base64(audio_base64).await
577 }
578
579 /// Send text to the session.
580 pub async fn send_text(&self, text: &str) -> Result<()> {
581 let guard = self.session.read().await;
582 let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
583 session.send_text(text).await
584 }
585
586 /// Commit the audio buffer (for manual VAD mode).
587 pub async fn commit_audio(&self) -> Result<()> {
588 let guard = self.session.read().await;
589 let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
590 session.commit_audio().await
591 }
592
593 /// Trigger a response from the model.
594 pub async fn create_response(&self) -> Result<()> {
595 let guard = self.session.read().await;
596 let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
597 session.create_response().await
598 }
599
600 /// Interrupt the current response.
601 pub async fn interrupt(&self) -> Result<()> {
602 let guard = self.session.read().await;
603 let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
604 session.interrupt().await
605 }
606
607 /// Get the next raw event from the session.
608 ///
609 /// # Example
610 ///
611 /// ```rust,ignore
612 /// use adk_realtime::events::ServerEvent;
613 /// use tracing::{info, error};
614 ///
615 /// async fn process_events(runner: &adk_realtime::RealtimeRunner) {
616 /// while let Some(event) = runner.next_event().await {
617 /// match event {
618 /// Ok(ServerEvent::SpeechStarted { .. }) => info!("User is speaking"),
619 /// Ok(_) => info!("Received other event"),
620 /// Err(e) => error!("Error: {e}"),
621 /// }
622 /// }
623 /// }
624 /// ```
625 pub async fn next_event(&self) -> Option<Result<ServerEvent>> {
626 let guard = self.session.read().await;
627 if let Some(session) = guard.as_ref() {
628 // Some sessions might yield inside next_event, but just in case, yield here too
629 tokio::task::yield_now().await;
630 session.next_event().await
631 } else {
632 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
633 None
634 }
635 }
636
637 /// Send a tool response to the session.
638 ///
639 /// # Example
640 ///
641 /// ```rust,ignore
642 /// use adk_realtime::events::ToolResponse;
643 /// use serde_json::json;
644 ///
645 /// async fn example(runner: &adk_realtime::RealtimeRunner) {
646 /// let response = ToolResponse {
647 /// call_id: "call_123".to_string(),
648 /// output: json!({"temperature": 72}),
649 /// };
650 /// runner.send_tool_response(response).await.unwrap();
651 /// }
652 /// ```
653 pub async fn send_tool_response(&self, response: ToolResponse) -> Result<()> {
654 let guard = self.session.read().await;
655 let session = guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
656 session.send_tool_response(response).await
657 }
658
659 /// Run the event loop, processing events until disconnected.
660 pub async fn run(&self) -> Result<()> {
661 loop {
662 let event = {
663 let guard = self.session.read().await;
664 let session =
665 guard.as_ref().ok_or_else(|| RealtimeError::connection("Not connected"))?;
666 session.next_event().await
667 };
668
669 match event {
670 Some(Ok(event)) => {
671 self.handle_event(event).await?;
672 }
673 Some(Err(e)) => {
674 self.event_handler.on_error(&e).await?;
675 return Err(e);
676 }
677 None => {
678 // Session closed
679 break;
680 }
681 }
682 }
683 Ok(())
684 }
685
686 /// Process a single event.
687 async fn handle_event(&self, event: ServerEvent) -> Result<()> {
688 // Track state transitions before forwarding the event
689 match &event {
690 ServerEvent::ResponseCreated { .. } => {
691 let mut state = self.state.write().await;
692 if let RunnerState::Idle = *state {
693 *state = RunnerState::Generating;
694 }
695 }
696 ServerEvent::FunctionCallDone { .. } => {
697 let mut state = self.state.write().await;
698 if let RunnerState::Generating | RunnerState::Idle = *state {
699 *state = RunnerState::ExecutingTool;
700 }
701 }
702 _ => {}
703 }
704
705 match event {
706 ServerEvent::AudioDelta { delta, item_id, .. } => {
707 self.event_handler.on_audio(&delta, &item_id).await?;
708 }
709 ServerEvent::TextDelta { delta, item_id, .. } => {
710 self.event_handler.on_text(&delta, &item_id).await?;
711 }
712 ServerEvent::TranscriptDelta { delta, item_id, .. } => {
713 self.event_handler.on_transcript(&delta, &item_id).await?;
714 }
715 ServerEvent::SpeechStarted { audio_start_ms, .. } => {
716 self.event_handler.on_speech_started(audio_start_ms).await?;
717 }
718 ServerEvent::SpeechStopped { audio_end_ms, .. } => {
719 self.event_handler.on_speech_stopped(audio_end_ms).await?;
720 }
721 ServerEvent::ResponseDone { .. } => {
722 self.event_handler.on_response_done().await?;
723 self.check_resumption_queue().await?;
724 }
725 ServerEvent::FunctionCallDone { call_id, name, arguments, .. } => {
726 if self.runner_config.auto_execute_tools {
727 self.execute_tool_call(&call_id, &name, &arguments).await?;
728 }
729 }
730 ServerEvent::SessionUpdated { session, .. } => {
731 // Check if the generic session update contains a resumption token
732 if let Some(token) = session.get("resumeToken").and_then(|t| t.as_str()) {
733 tracing::info!(
734 "Received Gemini sessionResumption token, saving for future reconnects."
735 );
736 let mut config = self.config.write().await;
737 let mut extra = config.extra.clone().unwrap_or_else(|| serde_json::json!({}));
738 extra["resumeToken"] = serde_json::Value::String(token.to_string());
739 config.extra = Some(extra);
740 }
741 }
742 ServerEvent::Error { error, .. } => {
743 let err = RealtimeError::server(error.code.unwrap_or_default(), error.message);
744 self.event_handler.on_error(&err).await?;
745 }
746 _ => {
747 // Ignore other events
748 }
749 }
750 Ok(())
751 }
752
753 /// Safely transitions the runner back to Idle and executes any queued resumptions.
754 async fn check_resumption_queue(&self) -> Result<()> {
755 // 1. Acquire the state lock to inspect the queue.
756 let mut state = self.state.write().await;
757
758 // 2. Extract the pending configuration and attempt count if one exists.
759 let pending =
760 if let RunnerState::PendingResumption { config, bridge_message, attempts } = &*state {
761 Some((config.clone(), bridge_message.clone(), *attempts))
762 } else {
763 None
764 };
765
766 if let Some((config, bridge_message, attempts)) = pending {
767 tracing::info!(
768 "Executing queued resumption after turn completion. (Attempt {})",
769 attempts + 1
770 );
771
772 // 3. Mark the state as Idle so the background loop is unblocked.
773 *state = RunnerState::Idle;
774
775 // 4. Release the state lock *before* performing the heavy async socket connection.
776 drop(state);
777
778 // 5. Attempt the actual transport teardown/rebuild.
779 if let Err(e) = self.execute_resumption((*config).clone(), bridge_message.clone()).await
780 {
781 tracing::error!("Resumption failed: {}.", e);
782
783 // 6. If the reconnect fails (e.g., transient network error), re-acquire the lock
784 // to safely handle the retry logic without crashing the event loop.
785 let mut fallback_state = self.state.write().await;
786
787 // 7. Enforce a maximum retry budget to prevent infinite "hot-looping"
788 if attempts + 1 >= 3 {
789 tracing::error!(
790 "Maximum resumption attempts reached (3). Dropping queued mutation to prevent infinite loop."
791 );
792 *fallback_state = RunnerState::Idle;
793 } else {
794 tracing::info!("Restoring pending queue state for retry.");
795 *fallback_state = RunnerState::PendingResumption {
796 config,
797 bridge_message,
798 attempts: attempts + 1,
799 };
800 }
801
802 // 8. Do not return Err(e) here, as that would permanently kill the `run()` loop.
803 // Instead, report the error to the downstream handler and allow the event loop to continue spinning.
804 let _ = self.event_handler.on_error(&e).await;
805 }
806 } else {
807 // No resumptions were queued; simply mark as Idle.
808 *state = RunnerState::Idle;
809 }
810 Ok(())
811 }
812
813 /// Execute a tool call and optionally send the response.
814 async fn execute_tool_call(&self, call_id: &str, name: &str, arguments: &str) -> Result<()> {
815 let handler = self.tools.get(name).map(|(_, h)| h.clone());
816
817 let result = if let Some(handler) = handler {
818 let args: serde_json::Value = serde_json::from_str(arguments)
819 .unwrap_or(serde_json::Value::Object(Default::default()));
820
821 let call =
822 ToolCall { call_id: call_id.to_string(), name: name.to_string(), arguments: args };
823
824 match handler.execute(&call).await {
825 Ok(value) => value,
826 Err(e) => serde_json::json!({
827 "error": e.to_string()
828 }),
829 }
830 } else {
831 serde_json::json!({
832 "error": format!("Unknown tool: {}", name)
833 })
834 };
835
836 if self.runner_config.auto_respond_tools {
837 let response = ToolResponse { call_id: call_id.to_string(), output: result };
838
839 let guard = self.session.read().await;
840 if let Some(session) = guard.as_ref() {
841 session.send_tool_response(response).await?;
842 }
843 }
844
845 Ok(())
846 }
847
848 /// Close the session.
849 pub async fn close(&self) -> Result<()> {
850 let guard = self.session.read().await;
851 if let Some(session) = guard.as_ref() {
852 session.close().await?;
853 }
854 Ok(())
855 }
856}