adk_realtime/session.rs
1//! Core RealtimeSession trait definition.
2
3use crate::audio::AudioChunk;
4use crate::error::Result;
5use crate::events::{ClientEvent, ServerEvent, ToolResponse};
6use async_trait::async_trait;
7use futures::Stream;
8use std::pin::Pin;
9
10/// The outcome of an attempt to mutate the session context mid-flight.
11#[derive(Debug, Clone)]
12pub enum ContextMutationOutcome {
13 /// Provider successfully updated the active session via sideband.
14 Applied,
15 /// Provider requires the transport to be rebound with a new configuration.
16 RequiresResumption(Box<crate::config::RealtimeConfig>),
17}
18
19/// A real-time bidirectional streaming session.
20///
21/// This trait provides a unified interface for real-time voice/audio sessions
22/// across different providers (OpenAI, Gemini, etc.).
23///
24/// # Example
25///
26/// ```rust,ignore
27/// use adk_realtime::{RealtimeSession, ServerEvent};
28///
29/// async fn handle_session(session: &dyn RealtimeSession) -> Result<()> {
30/// // Send audio
31/// session.send_audio(audio_chunk).await?;
32///
33/// // Receive events
34/// while let Some(event) = session.next_event().await {
35/// match event? {
36/// ServerEvent::AudioDelta { delta, .. } => { /* play audio */ }
37/// ServerEvent::FunctionCallDone { name, arguments, call_id, .. } => {
38/// // Execute tool and respond
39/// let result = execute_tool(&name, &arguments);
40/// session.send_tool_response(call_id, result).await?;
41/// }
42/// _ => {}
43/// }
44/// }
45/// Ok(())
46/// }
47/// ```
48#[async_trait]
49pub trait RealtimeSession: Send + Sync {
50 /// Get the session ID.
51 fn session_id(&self) -> &str;
52
53 /// Check if the session is currently connected.
54 fn is_connected(&self) -> bool;
55
56 /// Send raw audio data to the server.
57 ///
58 /// The audio should be in the format specified in the session configuration.
59 async fn send_audio(&self, audio: &AudioChunk) -> Result<()>;
60
61 /// Send base64-encoded audio directly.
62 async fn send_audio_base64(&self, audio_base64: &str) -> Result<()>;
63
64 /// Send a text message.
65 async fn send_text(&self, text: &str) -> Result<()>;
66
67 /// Send a tool/function response.
68 async fn send_tool_response(&self, response: ToolResponse) -> Result<()>;
69
70 /// Commit the audio buffer (for manual VAD mode).
71 async fn commit_audio(&self) -> Result<()>;
72
73 /// Clear the audio input buffer.
74 async fn clear_audio(&self) -> Result<()>;
75
76 /// Trigger a response from the model.
77 async fn create_response(&self) -> Result<()>;
78
79 /// Interrupt/cancel the current response.
80 async fn interrupt(&self) -> Result<()>;
81
82 /// Send a raw client event.
83 async fn send_event(&self, event: ClientEvent) -> Result<()>;
84
85 /// Get the next event from the server.
86 ///
87 /// Returns `None` when the session is closed.
88 async fn next_event(&self) -> Option<Result<ServerEvent>>;
89
90 /// Get a stream of server events.
91 fn events(&self) -> Pin<Box<dyn Stream<Item = Result<ServerEvent>> + Send + '_>>;
92
93 /// Close the session gracefully.
94 async fn close(&self) -> Result<()>;
95
96 /// Attempt to mutate the session parameters mid-flight.
97 ///
98 /// For providers that support native hot-swapping (e.g., OpenAI), this
99 /// mutates the parameters without tearing down the connection and returns `Ok(ContextMutationOutcome::Applied)`.
100 /// For providers that require a static configuration (e.g., Gemini), this
101 /// returns `Ok(ContextMutationOutcome::RequiresResumption(config))` to signal
102 /// the runner to queue a session reconnect or resumption safely.
103 async fn mutate_context(
104 &self,
105 config: crate::config::RealtimeConfig,
106 ) -> Result<ContextMutationOutcome>;
107}
108
109/// Extension trait for RealtimeSession with convenience methods.
110#[async_trait]
111pub trait RealtimeSessionExt: RealtimeSession {
112 /// Send audio and wait for the response to complete.
113 async fn send_audio_and_wait(&self, audio: &AudioChunk) -> Result<Vec<ServerEvent>> {
114 self.send_audio(audio).await?;
115 self.commit_audio().await?;
116
117 let mut events = Vec::new();
118 while let Some(event) = self.next_event().await {
119 let event = event?;
120 let is_done = matches!(&event, ServerEvent::ResponseDone { .. });
121 events.push(event);
122 if is_done {
123 break;
124 }
125 }
126 Ok(events)
127 }
128
129 /// Send text and wait for the response to complete.
130 async fn send_text_and_wait(&self, text: &str) -> Result<Vec<ServerEvent>> {
131 self.send_text(text).await?;
132 self.create_response().await?;
133
134 let mut events = Vec::new();
135 while let Some(event) = self.next_event().await {
136 let event = event?;
137 let is_done = matches!(&event, ServerEvent::ResponseDone { .. });
138 events.push(event);
139 if is_done {
140 break;
141 }
142 }
143 Ok(events)
144 }
145
146 /// Collect all audio chunks from a response (as raw bytes).
147 async fn collect_audio(&self) -> Result<Vec<Vec<u8>>> {
148 let mut audio_chunks = Vec::new();
149 while let Some(event) = self.next_event().await {
150 match event? {
151 ServerEvent::AudioDelta { delta, .. } => {
152 audio_chunks.push(delta);
153 }
154 ServerEvent::ResponseDone { .. } => break,
155 ServerEvent::Error { error, .. } => {
156 return Err(crate::error::RealtimeError::server(
157 error.code.unwrap_or_default(),
158 error.message,
159 ));
160 }
161 _ => {}
162 }
163 }
164 Ok(audio_chunks)
165 }
166}
167
168// Blanket implementation
169impl<T: RealtimeSession> RealtimeSessionExt for T {}
170
171/// A boxed session type for dynamic dispatch.
172pub type BoxedSession = Box<dyn RealtimeSession>;