Skip to main content

openai_tools/realtime/
stream.rs

1//! Event handler utilities for the Realtime API.
2
3use super::events::server::*;
4
5/// Callback-based event handler for processing server events.
6///
7/// # Example
8///
9/// ```rust,no_run
10/// use openai_tools::realtime::EventHandler;
11/// use openai_tools::realtime::events::server::ServerEvent;
12///
13/// let mut handler = EventHandler::new();
14/// handler
15///     .on_text_delta(|e| {
16///         print!("{}", e.delta);
17///     })
18///     .on_audio_delta(|e| {
19///         // Play audio: base64::decode(&e.delta)
20///     })
21///     .on_error(|e| {
22///         eprintln!("Error: {}", e.error.message);
23///     });
24///
25/// // In your event loop:
26/// // handler.handle(&event);
27/// ```
28#[allow(clippy::type_complexity)]
29pub struct EventHandler {
30    on_session_created: Option<Box<dyn Fn(&SessionCreatedEvent) + Send + Sync>>,
31    on_session_updated: Option<Box<dyn Fn(&SessionUpdatedEvent) + Send + Sync>>,
32    on_conversation_item_created: Option<Box<dyn Fn(&ConversationItemCreatedEvent) + Send + Sync>>,
33    on_input_audio_transcription_completed: Option<Box<dyn Fn(&InputAudioTranscriptionCompletedEvent) + Send + Sync>>,
34    on_speech_started: Option<Box<dyn Fn(&SpeechStartedEvent) + Send + Sync>>,
35    on_speech_stopped: Option<Box<dyn Fn(&SpeechStoppedEvent) + Send + Sync>>,
36    on_response_created: Option<Box<dyn Fn(&ResponseCreatedEvent) + Send + Sync>>,
37    on_response_done: Option<Box<dyn Fn(&ResponseDoneEvent) + Send + Sync>>,
38    on_text_delta: Option<Box<dyn Fn(&ResponseTextDeltaEvent) + Send + Sync>>,
39    on_text_done: Option<Box<dyn Fn(&ResponseTextDoneEvent) + Send + Sync>>,
40    on_audio_delta: Option<Box<dyn Fn(&ResponseAudioDeltaEvent) + Send + Sync>>,
41    on_audio_done: Option<Box<dyn Fn(&ResponseAudioDoneEvent) + Send + Sync>>,
42    on_audio_transcript_delta: Option<Box<dyn Fn(&ResponseAudioTranscriptDeltaEvent) + Send + Sync>>,
43    on_audio_transcript_done: Option<Box<dyn Fn(&ResponseAudioTranscriptDoneEvent) + Send + Sync>>,
44    on_function_call_arguments_delta: Option<Box<dyn Fn(&ResponseFunctionCallArgumentsDeltaEvent) + Send + Sync>>,
45    on_function_call_arguments_done: Option<Box<dyn Fn(&ResponseFunctionCallArgumentsDoneEvent) + Send + Sync>>,
46    on_rate_limits_updated: Option<Box<dyn Fn(&RateLimitsUpdatedEvent) + Send + Sync>>,
47    on_error: Option<Box<dyn Fn(&ErrorEvent) + Send + Sync>>,
48}
49
50impl EventHandler {
51    /// Create a new event handler with no callbacks set.
52    pub fn new() -> Self {
53        Self {
54            on_session_created: None,
55            on_session_updated: None,
56            on_conversation_item_created: None,
57            on_input_audio_transcription_completed: None,
58            on_speech_started: None,
59            on_speech_stopped: None,
60            on_response_created: None,
61            on_response_done: None,
62            on_text_delta: None,
63            on_text_done: None,
64            on_audio_delta: None,
65            on_audio_done: None,
66            on_audio_transcript_delta: None,
67            on_audio_transcript_done: None,
68            on_function_call_arguments_delta: None,
69            on_function_call_arguments_done: None,
70            on_rate_limits_updated: None,
71            on_error: None,
72        }
73    }
74
75    /// Set callback for session.created events.
76    pub fn on_session_created<F>(&mut self, f: F) -> &mut Self
77    where
78        F: Fn(&SessionCreatedEvent) + Send + Sync + 'static,
79    {
80        self.on_session_created = Some(Box::new(f));
81        self
82    }
83
84    /// Set callback for session.updated events.
85    pub fn on_session_updated<F>(&mut self, f: F) -> &mut Self
86    where
87        F: Fn(&SessionUpdatedEvent) + Send + Sync + 'static,
88    {
89        self.on_session_updated = Some(Box::new(f));
90        self
91    }
92
93    /// Set callback for conversation.item.created events.
94    pub fn on_conversation_item_created<F>(&mut self, f: F) -> &mut Self
95    where
96        F: Fn(&ConversationItemCreatedEvent) + Send + Sync + 'static,
97    {
98        self.on_conversation_item_created = Some(Box::new(f));
99        self
100    }
101
102    /// Set callback for input audio transcription completed events.
103    pub fn on_input_audio_transcription_completed<F>(&mut self, f: F) -> &mut Self
104    where
105        F: Fn(&InputAudioTranscriptionCompletedEvent) + Send + Sync + 'static,
106    {
107        self.on_input_audio_transcription_completed = Some(Box::new(f));
108        self
109    }
110
111    /// Set callback for speech started events.
112    pub fn on_speech_started<F>(&mut self, f: F) -> &mut Self
113    where
114        F: Fn(&SpeechStartedEvent) + Send + Sync + 'static,
115    {
116        self.on_speech_started = Some(Box::new(f));
117        self
118    }
119
120    /// Set callback for speech stopped events.
121    pub fn on_speech_stopped<F>(&mut self, f: F) -> &mut Self
122    where
123        F: Fn(&SpeechStoppedEvent) + Send + Sync + 'static,
124    {
125        self.on_speech_stopped = Some(Box::new(f));
126        self
127    }
128
129    /// Set callback for response.created events.
130    pub fn on_response_created<F>(&mut self, f: F) -> &mut Self
131    where
132        F: Fn(&ResponseCreatedEvent) + Send + Sync + 'static,
133    {
134        self.on_response_created = Some(Box::new(f));
135        self
136    }
137
138    /// Set callback for response.done events.
139    pub fn on_response_done<F>(&mut self, f: F) -> &mut Self
140    where
141        F: Fn(&ResponseDoneEvent) + Send + Sync + 'static,
142    {
143        self.on_response_done = Some(Box::new(f));
144        self
145    }
146
147    /// Set callback for response.text.delta events.
148    pub fn on_text_delta<F>(&mut self, f: F) -> &mut Self
149    where
150        F: Fn(&ResponseTextDeltaEvent) + Send + Sync + 'static,
151    {
152        self.on_text_delta = Some(Box::new(f));
153        self
154    }
155
156    /// Set callback for response.text.done events.
157    pub fn on_text_done<F>(&mut self, f: F) -> &mut Self
158    where
159        F: Fn(&ResponseTextDoneEvent) + Send + Sync + 'static,
160    {
161        self.on_text_done = Some(Box::new(f));
162        self
163    }
164
165    /// Set callback for response.audio.delta events.
166    pub fn on_audio_delta<F>(&mut self, f: F) -> &mut Self
167    where
168        F: Fn(&ResponseAudioDeltaEvent) + Send + Sync + 'static,
169    {
170        self.on_audio_delta = Some(Box::new(f));
171        self
172    }
173
174    /// Set callback for response.audio.done events.
175    pub fn on_audio_done<F>(&mut self, f: F) -> &mut Self
176    where
177        F: Fn(&ResponseAudioDoneEvent) + Send + Sync + 'static,
178    {
179        self.on_audio_done = Some(Box::new(f));
180        self
181    }
182
183    /// Set callback for response.audio_transcript.delta events.
184    pub fn on_audio_transcript_delta<F>(&mut self, f: F) -> &mut Self
185    where
186        F: Fn(&ResponseAudioTranscriptDeltaEvent) + Send + Sync + 'static,
187    {
188        self.on_audio_transcript_delta = Some(Box::new(f));
189        self
190    }
191
192    /// Set callback for response.audio_transcript.done events.
193    pub fn on_audio_transcript_done<F>(&mut self, f: F) -> &mut Self
194    where
195        F: Fn(&ResponseAudioTranscriptDoneEvent) + Send + Sync + 'static,
196    {
197        self.on_audio_transcript_done = Some(Box::new(f));
198        self
199    }
200
201    /// Set callback for function call arguments delta events.
202    pub fn on_function_call_arguments_delta<F>(&mut self, f: F) -> &mut Self
203    where
204        F: Fn(&ResponseFunctionCallArgumentsDeltaEvent) + Send + Sync + 'static,
205    {
206        self.on_function_call_arguments_delta = Some(Box::new(f));
207        self
208    }
209
210    /// Set callback for function call arguments done events.
211    pub fn on_function_call_arguments_done<F>(&mut self, f: F) -> &mut Self
212    where
213        F: Fn(&ResponseFunctionCallArgumentsDoneEvent) + Send + Sync + 'static,
214    {
215        self.on_function_call_arguments_done = Some(Box::new(f));
216        self
217    }
218
219    /// Set callback for rate limits updated events.
220    pub fn on_rate_limits_updated<F>(&mut self, f: F) -> &mut Self
221    where
222        F: Fn(&RateLimitsUpdatedEvent) + Send + Sync + 'static,
223    {
224        self.on_rate_limits_updated = Some(Box::new(f));
225        self
226    }
227
228    /// Set callback for error events.
229    pub fn on_error<F>(&mut self, f: F) -> &mut Self
230    where
231        F: Fn(&ErrorEvent) + Send + Sync + 'static,
232    {
233        self.on_error = Some(Box::new(f));
234        self
235    }
236
237    /// Process a server event, calling the appropriate callback.
238    pub fn handle(&self, event: &ServerEvent) {
239        match event {
240            ServerEvent::SessionCreated(e) => {
241                if let Some(f) = &self.on_session_created {
242                    f(e);
243                }
244            }
245            ServerEvent::SessionUpdated(e) => {
246                if let Some(f) = &self.on_session_updated {
247                    f(e);
248                }
249            }
250            ServerEvent::ConversationItemCreated(e) => {
251                if let Some(f) = &self.on_conversation_item_created {
252                    f(e);
253                }
254            }
255            ServerEvent::InputAudioTranscriptionCompleted(e) => {
256                if let Some(f) = &self.on_input_audio_transcription_completed {
257                    f(e);
258                }
259            }
260            ServerEvent::InputAudioBufferSpeechStarted(e) => {
261                if let Some(f) = &self.on_speech_started {
262                    f(e);
263                }
264            }
265            ServerEvent::InputAudioBufferSpeechStopped(e) => {
266                if let Some(f) = &self.on_speech_stopped {
267                    f(e);
268                }
269            }
270            ServerEvent::ResponseCreated(e) => {
271                if let Some(f) = &self.on_response_created {
272                    f(e);
273                }
274            }
275            ServerEvent::ResponseDone(e) => {
276                if let Some(f) = &self.on_response_done {
277                    f(e);
278                }
279            }
280            ServerEvent::ResponseTextDelta(e) => {
281                if let Some(f) = &self.on_text_delta {
282                    f(e);
283                }
284            }
285            ServerEvent::ResponseTextDone(e) => {
286                if let Some(f) = &self.on_text_done {
287                    f(e);
288                }
289            }
290            ServerEvent::ResponseAudioDelta(e) => {
291                if let Some(f) = &self.on_audio_delta {
292                    f(e);
293                }
294            }
295            ServerEvent::ResponseAudioDone(e) => {
296                if let Some(f) = &self.on_audio_done {
297                    f(e);
298                }
299            }
300            ServerEvent::ResponseAudioTranscriptDelta(e) => {
301                if let Some(f) = &self.on_audio_transcript_delta {
302                    f(e);
303                }
304            }
305            ServerEvent::ResponseAudioTranscriptDone(e) => {
306                if let Some(f) = &self.on_audio_transcript_done {
307                    f(e);
308                }
309            }
310            ServerEvent::ResponseFunctionCallArgumentsDelta(e) => {
311                if let Some(f) = &self.on_function_call_arguments_delta {
312                    f(e);
313                }
314            }
315            ServerEvent::ResponseFunctionCallArgumentsDone(e) => {
316                if let Some(f) = &self.on_function_call_arguments_done {
317                    f(e);
318                }
319            }
320            ServerEvent::RateLimitsUpdated(e) => {
321                if let Some(f) = &self.on_rate_limits_updated {
322                    f(e);
323                }
324            }
325            ServerEvent::Error(e) => {
326                if let Some(f) = &self.on_error {
327                    f(e);
328                }
329            }
330            // Events without specific handlers
331            _ => {}
332        }
333    }
334}
335
336impl Default for EventHandler {
337    fn default() -> Self {
338        Self::new()
339    }
340}