Skip to main content

adk_realtime/
session.rs

1//! Core RealtimeSession trait definition.
2
3use crate::audio::AudioChunk;
4use crate::error::Result;
5use crate::events::{ClientEvent, ServerEvent, ToolResponse};
6use async_trait::async_trait;
7use futures::Stream;
8use std::pin::Pin;
9
10/// The outcome of an attempt to mutate the session context mid-flight.
11#[derive(Debug, Clone)]
12pub enum ContextMutationOutcome {
13    /// Provider successfully updated the active session via sideband.
14    Applied,
15    /// Provider requires the transport to be rebound with a new configuration.
16    RequiresResumption(Box<crate::config::RealtimeConfig>),
17}
18
19/// A real-time bidirectional streaming session.
20///
21/// This trait provides a unified interface for real-time voice/audio sessions
22/// across different providers (OpenAI, Gemini, etc.).
23///
24/// # Example
25///
26/// ```rust,ignore
27/// use adk_realtime::{RealtimeSession, ServerEvent};
28///
29/// async fn handle_session(session: &dyn RealtimeSession) -> Result<()> {
30///     // Send audio
31///     session.send_audio(audio_chunk).await?;
32///
33///     // Receive events
34///     while let Some(event) = session.next_event().await {
35///         match event? {
36///             ServerEvent::AudioDelta { delta, .. } => { /* play audio */ }
37///             ServerEvent::FunctionCallDone { name, arguments, call_id, .. } => {
38///                 // Execute tool and respond
39///                 let result = execute_tool(&name, &arguments);
40///                 session.send_tool_response(call_id, result).await?;
41///             }
42///             _ => {}
43///         }
44///     }
45///     Ok(())
46/// }
47/// ```
48#[async_trait]
49pub trait RealtimeSession: Send + Sync {
50    /// Get the session ID.
51    fn session_id(&self) -> &str;
52
53    /// Check if the session is currently connected.
54    fn is_connected(&self) -> bool;
55
56    /// Send raw audio data to the server.
57    ///
58    /// The audio should be in the format specified in the session configuration.
59    async fn send_audio(&self, audio: &AudioChunk) -> Result<()>;
60
61    /// Send base64-encoded audio directly.
62    async fn send_audio_base64(&self, audio_base64: &str) -> Result<()>;
63
64    /// Send a text message.
65    async fn send_text(&self, text: &str) -> Result<()>;
66
67    /// Send a tool/function response.
68    async fn send_tool_response(&self, response: ToolResponse) -> Result<()>;
69
70    /// Commit the audio buffer (for manual VAD mode).
71    async fn commit_audio(&self) -> Result<()>;
72
73    /// Clear the audio input buffer.
74    async fn clear_audio(&self) -> Result<()>;
75
76    /// Trigger a response from the model.
77    async fn create_response(&self) -> Result<()>;
78
79    /// Interrupt/cancel the current response.
80    async fn interrupt(&self) -> Result<()>;
81
82    /// Send a raw client event.
83    async fn send_event(&self, event: ClientEvent) -> Result<()>;
84
85    /// Get the next event from the server.
86    ///
87    /// Returns `None` when the session is closed.
88    async fn next_event(&self) -> Option<Result<ServerEvent>>;
89
90    /// Get a stream of server events.
91    fn events(&self) -> Pin<Box<dyn Stream<Item = Result<ServerEvent>> + Send + '_>>;
92
93    /// Close the session gracefully.
94    async fn close(&self) -> Result<()>;
95
96    /// Attempt to mutate the session parameters mid-flight.
97    ///
98    /// For providers that support native hot-swapping (e.g., OpenAI), this
99    /// mutates the parameters without tearing down the connection and returns `Ok(ContextMutationOutcome::Applied)`.
100    /// For providers that require a static configuration (e.g., Gemini), this
101    /// returns `Ok(ContextMutationOutcome::RequiresResumption(config))` to signal
102    /// the runner to queue a session reconnect or resumption safely.
103    async fn mutate_context(
104        &self,
105        config: crate::config::RealtimeConfig,
106    ) -> Result<ContextMutationOutcome>;
107}
108
109/// Extension trait for RealtimeSession with convenience methods.
110#[async_trait]
111pub trait RealtimeSessionExt: RealtimeSession {
112    /// Send audio and wait for the response to complete.
113    async fn send_audio_and_wait(&self, audio: &AudioChunk) -> Result<Vec<ServerEvent>> {
114        self.send_audio(audio).await?;
115        self.commit_audio().await?;
116
117        let mut events = Vec::new();
118        while let Some(event) = self.next_event().await {
119            let event = event?;
120            let is_done = matches!(&event, ServerEvent::ResponseDone { .. });
121            events.push(event);
122            if is_done {
123                break;
124            }
125        }
126        Ok(events)
127    }
128
129    /// Send text and wait for the response to complete.
130    async fn send_text_and_wait(&self, text: &str) -> Result<Vec<ServerEvent>> {
131        self.send_text(text).await?;
132        self.create_response().await?;
133
134        let mut events = Vec::new();
135        while let Some(event) = self.next_event().await {
136            let event = event?;
137            let is_done = matches!(&event, ServerEvent::ResponseDone { .. });
138            events.push(event);
139            if is_done {
140                break;
141            }
142        }
143        Ok(events)
144    }
145
146    /// Collect all audio chunks from a response (as raw bytes).
147    async fn collect_audio(&self) -> Result<Vec<Vec<u8>>> {
148        let mut audio_chunks = Vec::new();
149        while let Some(event) = self.next_event().await {
150            match event? {
151                ServerEvent::AudioDelta { delta, .. } => {
152                    audio_chunks.push(delta);
153                }
154                ServerEvent::ResponseDone { .. } => break,
155                ServerEvent::Error { error, .. } => {
156                    return Err(crate::error::RealtimeError::server(
157                        error.code.unwrap_or_default(),
158                        error.message,
159                    ));
160                }
161                _ => {}
162            }
163        }
164        Ok(audio_chunks)
165    }
166}
167
168// Blanket implementation
169impl<T: RealtimeSession> RealtimeSessionExt for T {}
170
171/// A boxed session type for dynamic dispatch.
172pub type BoxedSession = Box<dyn RealtimeSession>;