Skip to main content

adk_realtime/
session.rs

1//! Core RealtimeSession trait definition.
2
3use crate::audio::AudioChunk;
4use crate::error::Result;
5use crate::events::{ClientEvent, ServerEvent, ToolResponse};
6use async_trait::async_trait;
7use futures::Stream;
8use std::pin::Pin;
9
10/// A real-time bidirectional streaming session.
11///
12/// This trait provides a unified interface for real-time voice/audio sessions
13/// across different providers (OpenAI, Gemini, etc.).
14///
15/// # Example
16///
17/// ```rust,ignore
18/// use adk_realtime::{RealtimeSession, ServerEvent};
19///
20/// async fn handle_session(session: &dyn RealtimeSession) -> Result<()> {
21///     // Send audio
22///     session.send_audio(audio_chunk).await?;
23///
24///     // Receive events
25///     while let Some(event) = session.next_event().await {
26///         match event? {
27///             ServerEvent::AudioDelta { delta, .. } => { /* play audio */ }
28///             ServerEvent::FunctionCallDone { name, arguments, call_id, .. } => {
29///                 // Execute tool and respond
30///                 let result = execute_tool(&name, &arguments);
31///                 session.send_tool_response(call_id, result).await?;
32///             }
33///             _ => {}
34///         }
35///     }
36///     Ok(())
37/// }
38/// ```
39#[async_trait]
40pub trait RealtimeSession: Send + Sync {
41    /// Get the session ID.
42    fn session_id(&self) -> &str;
43
44    /// Check if the session is currently connected.
45    fn is_connected(&self) -> bool;
46
47    /// Send raw audio data to the server.
48    ///
49    /// The audio should be in the format specified in the session configuration.
50    async fn send_audio(&self, audio: &AudioChunk) -> Result<()>;
51
52    /// Send base64-encoded audio directly.
53    async fn send_audio_base64(&self, audio_base64: &str) -> Result<()>;
54
55    /// Send a text message.
56    async fn send_text(&self, text: &str) -> Result<()>;
57
58    /// Send a tool/function response.
59    async fn send_tool_response(&self, response: ToolResponse) -> Result<()>;
60
61    /// Commit the audio buffer (for manual VAD mode).
62    async fn commit_audio(&self) -> Result<()>;
63
64    /// Clear the audio input buffer.
65    async fn clear_audio(&self) -> Result<()>;
66
67    /// Trigger a response from the model.
68    async fn create_response(&self) -> Result<()>;
69
70    /// Interrupt/cancel the current response.
71    async fn interrupt(&self) -> Result<()>;
72
73    /// Send a raw client event.
74    async fn send_event(&self, event: ClientEvent) -> Result<()>;
75
76    /// Get the next event from the server.
77    ///
78    /// Returns `None` when the session is closed.
79    async fn next_event(&self) -> Option<Result<ServerEvent>>;
80
81    /// Get a stream of server events.
82    fn events(&self) -> Pin<Box<dyn Stream<Item = Result<ServerEvent>> + Send + '_>>;
83
84    /// Close the session gracefully.
85    async fn close(&self) -> Result<()>;
86}
87
88/// Extension trait for RealtimeSession with convenience methods.
89#[async_trait]
90pub trait RealtimeSessionExt: RealtimeSession {
91    /// Send audio and wait for the response to complete.
92    async fn send_audio_and_wait(&self, audio: &AudioChunk) -> Result<Vec<ServerEvent>> {
93        self.send_audio(audio).await?;
94        self.commit_audio().await?;
95
96        let mut events = Vec::new();
97        while let Some(event) = self.next_event().await {
98            let event = event?;
99            let is_done = matches!(&event, ServerEvent::ResponseDone { .. });
100            events.push(event);
101            if is_done {
102                break;
103            }
104        }
105        Ok(events)
106    }
107
108    /// Send text and wait for the response to complete.
109    async fn send_text_and_wait(&self, text: &str) -> Result<Vec<ServerEvent>> {
110        self.send_text(text).await?;
111        self.create_response().await?;
112
113        let mut events = Vec::new();
114        while let Some(event) = self.next_event().await {
115            let event = event?;
116            let is_done = matches!(&event, ServerEvent::ResponseDone { .. });
117            events.push(event);
118            if is_done {
119                break;
120            }
121        }
122        Ok(events)
123    }
124
125    /// Collect all audio chunks from a response (as raw bytes).
126    async fn collect_audio(&self) -> Result<Vec<Vec<u8>>> {
127        let mut audio_chunks = Vec::new();
128        while let Some(event) = self.next_event().await {
129            match event? {
130                ServerEvent::AudioDelta { delta, .. } => {
131                    audio_chunks.push(delta);
132                }
133                ServerEvent::ResponseDone { .. } => break,
134                ServerEvent::Error { error, .. } => {
135                    return Err(crate::error::RealtimeError::server(
136                        error.code.unwrap_or_default(),
137                        error.message,
138                    ));
139                }
140                _ => {}
141            }
142        }
143        Ok(audio_chunks)
144    }
145}
146
147// Blanket implementation
148impl<T: RealtimeSession> RealtimeSessionExt for T {}
149
150/// A boxed session type for dynamic dispatch.
151pub type BoxedSession = Box<dyn RealtimeSession>;