1use std::error::Error;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5
6use crate::context::{AudioContextState, BaseAudioContext, ConcreteBaseAudioContext};
7use crate::events::{EventDispatch, EventHandler, EventLoop, EventPayload, EventType};
8use crate::io::{self, AudioBackendManager, ControlThreadInit, NoneBackend, RenderThreadInit};
9use crate::media_devices::{enumerate_devices_sync, MediaDeviceInfoKind};
10use crate::media_streams::{MediaStream, MediaStreamTrack};
11use crate::message::{ControlMessage, OneshotNotify};
12use crate::node::{self, AudioNodeOptions};
13use crate::render::graph::Graph;
14use crate::MediaElement;
15use crate::{AudioPlaybackStats, AudioRenderCapacity, Event};
16
17use futures_channel::oneshot;
18
19fn is_valid_sink_id(sink_id: &str) -> bool {
23 if sink_id.is_empty() || sink_id == "none" {
24 true
25 } else {
26 enumerate_devices_sync()
27 .into_iter()
28 .filter(|d| d.kind() == MediaDeviceInfoKind::AudioOutput)
29 .any(|d| d.device_id() == sink_id)
30 }
31}
32
33#[derive(Debug)]
34enum AudioContextError {
35 SinkNotFound { sink_id: String },
36 Backend { error: io::AudioBackendError },
37}
38
39impl std::fmt::Display for AudioContextError {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 Self::SinkNotFound { sink_id } => {
43 write!(f, "NotFoundError - Invalid sinkId: {sink_id:?}")
44 }
45 Self::Backend { error } => write!(f, "InvalidStateError - {error}"),
46 }
47 }
48}
49
50impl Error for AudioContextError {}
51
52impl From<io::AudioBackendError> for AudioContextError {
53 fn from(error: io::AudioBackendError) -> Self {
54 Self::Backend { error }
55 }
56}
57
58#[derive(Copy, Clone, Debug, Default)]
61pub enum AudioContextLatencyCategory {
62 Balanced,
64 #[default]
66 Interactive,
67 Playback,
71 Custom(f64),
75}
76
77#[derive(Copy, Clone, Debug)]
78#[non_exhaustive]
79#[derive(Default)]
83pub enum AudioContextRenderSizeCategory {
84 #[default]
86 Default,
87}
88
89#[derive(Clone, Debug, Default)]
105pub struct AudioContextOptions {
106 pub latency_hint: AudioContextLatencyCategory,
109
110 pub sample_rate: Option<f32>,
112
113 pub sink_id: String,
118
119 pub render_size_hint: AudioContextRenderSizeCategory,
121}
122
123#[allow(clippy::module_name_repetitions)]
127pub struct AudioContext {
128 base: ConcreteBaseAudioContext,
130 backend_manager: Mutex<Box<dyn AudioBackendManager>>,
132 render_capacity: AudioRenderCapacity,
134 playback_stats: AudioPlaybackStats,
136 startup_pending: std::sync::Arc<AtomicBool>,
138 render_thread_init: RenderThreadInit,
140}
141
142impl std::fmt::Debug for AudioContext {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.debug_struct("AudioContext")
145 .field("sink_id", &self.sink_id())
146 .field("base_latency", &self.base_latency())
147 .field("output_latency", &self.output_latency())
148 .field("base", &self.base())
149 .finish_non_exhaustive()
150 }
151}
152
153impl Drop for AudioContext {
154 fn drop(&mut self) {
155 if self.state() == AudioContextState::Running {
157 let tombstone = Box::new(NoneBackend::void());
158 let original = std::mem::replace(self.backend_manager.get_mut().unwrap(), tombstone);
159 Box::leak(original);
160 }
161 }
162}
163
164impl BaseAudioContext for AudioContext {
165 fn base(&self) -> &ConcreteBaseAudioContext {
166 &self.base
167 }
168}
169
170impl Default for AudioContext {
171 fn default() -> Self {
172 Self::new(AudioContextOptions::default())
173 }
174}
175
176impl AudioContext {
177 #[must_use]
203 pub fn new(options: AudioContextOptions) -> Self {
204 Self::try_new_inner(options).unwrap_or_else(|e| panic!("{e}"))
205 }
206
207 pub fn try_new(options: AudioContextOptions) -> Result<Self, Box<dyn Error>> {
217 Self::try_new_inner(options).map_err(Into::into)
218 }
219
220 fn try_new_inner(options: AudioContextOptions) -> Result<Self, AudioContextError> {
221 if !is_valid_sink_id(&options.sink_id) {
223 return Err(AudioContextError::SinkNotFound {
224 sink_id: options.sink_id,
225 });
226 }
227
228 let (control_thread_init, render_thread_init) = io::thread_init();
230 let startup_pending = Arc::clone(&render_thread_init.startup_pending);
231 let backend = io::build_output(options, render_thread_init.clone())?;
232
233 let ControlThreadInit {
234 state,
235 frames_played,
236 stats,
237 ctrl_msg_send,
238 event_send,
239 event_recv,
240 } = control_thread_init;
241
242 let (node_id_producer, node_id_consumer) = llq::Queue::new().split();
244 let graph = Graph::new(node_id_producer);
245 let message = ControlMessage::Startup { graph };
246 ctrl_msg_send.send(message).unwrap();
247
248 let event_loop = EventLoop::new(event_recv);
250
251 let base = ConcreteBaseAudioContext::new(
253 backend.sample_rate(),
254 backend.number_of_channels(),
255 state,
256 frames_played,
257 ctrl_msg_send,
258 event_send,
259 event_loop.clone(),
260 false,
261 node_id_consumer,
262 );
263
264 let render_capacity = AudioRenderCapacity::new(base.clone(), stats.clone());
266 let playback_stats = AudioPlaybackStats::new(base.clone(), stats);
267
268 event_loop.run_in_thread();
272
273 Ok(Self {
274 base,
275 backend_manager: Mutex::new(backend),
276 render_capacity,
277 playback_stats,
278 startup_pending,
279 render_thread_init,
280 })
281 }
282
283 #[allow(clippy::unused_self)]
289 #[must_use]
290 pub fn base_latency(&self) -> f64 {
291 0.
292 }
293
294 #[must_use]
299 #[allow(clippy::missing_panics_doc)]
300 pub fn output_latency(&self) -> f64 {
301 self.try_output_latency()
302 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"))
303 }
304
305 fn try_output_latency(&self) -> Result<f64, Box<dyn Error>> {
311 Ok(self.backend_manager.lock().unwrap().output_latency()?)
312 }
313
314 #[allow(clippy::missing_panics_doc)]
318 pub fn sink_id(&self) -> String {
319 self.backend_manager.lock().unwrap().sink_id().to_owned()
320 }
321
322 #[must_use]
324 pub fn render_capacity(&self) -> AudioRenderCapacity {
325 self.render_capacity.clone()
326 }
327
328 #[must_use]
330 pub fn playback_stats(&self) -> AudioPlaybackStats {
331 self.playback_stats.clone()
332 }
333
334 #[allow(clippy::needless_collect, clippy::missing_panics_doc)]
344 pub fn set_sink_id_sync(&self, sink_id: String) -> Result<(), Box<dyn Error>> {
345 log::debug!("SinkChange requested");
346 if self.sink_id() == sink_id {
347 log::debug!("SinkChange: no-op");
348 return Ok(()); }
350
351 if !is_valid_sink_id(&sink_id) {
352 Err(format!("NotFoundError: invalid sinkId {sink_id}"))?;
353 };
354
355 log::debug!("SinkChange: locking backend manager");
356 let mut backend_manager_guard = self.backend_manager.lock().unwrap();
357 let original_state = self.state();
358 if original_state == AudioContextState::Closed {
359 log::debug!("SinkChange: context is closed");
360 return Ok(());
361 }
362
363 log::debug!("SinkChange: locking message channel");
365 let ctrl_msg_send = self.base.lock_control_msg_sender();
366
367 let mut pending_msgs: Vec<_> = self.render_thread_init.ctrl_msg_recv.try_iter().collect();
369
370 let graph = if matches!(pending_msgs.first(), Some(ControlMessage::Startup { .. })) {
372 log::debug!("SinkChange: recover unstarted graph");
375
376 let msg = pending_msgs.remove(0);
377 match msg {
378 ControlMessage::Startup { graph } => graph,
379 _ => unreachable!(),
380 }
381 } else {
382 log::debug!("SinkChange: recover graph from render thread");
384
385 let (graph_send, graph_recv) = crossbeam_channel::bounded(1);
386 let message = ControlMessage::CloseAndRecycle { sender: graph_send };
387 ctrl_msg_send.send(message).unwrap();
388 if original_state == AudioContextState::Suspended {
389 backend_manager_guard.resume()?;
392 }
393 graph_recv.recv().unwrap()
394 };
395
396 log::debug!("SinkChange: closing audio stream");
397 backend_manager_guard.close()?;
398
399 let options = AudioContextOptions {
401 sample_rate: Some(self.sample_rate()),
402 latency_hint: AudioContextLatencyCategory::default(), sink_id,
404 render_size_hint: AudioContextRenderSizeCategory::default(), };
406 log::debug!("SinkChange: starting audio stream");
407 *backend_manager_guard = io::build_output(options, self.render_thread_init.clone())?;
408
409 if original_state == AudioContextState::Suspended {
411 log::debug!("SinkChange: suspending audio stream");
412 backend_manager_guard.suspend()?;
413 }
414
415 let message = ControlMessage::Startup { graph };
417 ctrl_msg_send.send(message).unwrap();
418
419 pending_msgs
421 .into_iter()
422 .for_each(|m| self.base().send_control_msg(m));
423
424 drop(backend_manager_guard);
426
427 let _ = self.base.send_event(EventDispatch::sink_change());
429
430 log::debug!("SinkChange: done");
431 Ok(())
432 }
433
434 pub fn set_onsinkchange<F: FnMut(Event) + Send + 'static>(&self, mut callback: F) {
439 let callback = move |_| {
440 callback(Event {
441 type_: "sinkchange",
442 })
443 };
444
445 self.base().set_event_handler(
446 EventType::SinkChange,
447 EventHandler::Multiple(Box::new(callback)),
448 );
449 }
450
451 pub fn clear_onsinkchange(&self) {
453 self.base().clear_event_handler(EventType::SinkChange);
454 }
455
456 #[allow(clippy::missing_panics_doc)]
457 #[doc(hidden)] pub fn run_diagnostics<F: Fn(String) + Send + 'static>(&self, callback: F) {
459 let mut buffer = Vec::with_capacity(32 * 1024);
460 {
461 let backend = self.backend_manager.lock().unwrap();
462 use std::io::Write;
463 writeln!(&mut buffer, "backend: {}", backend.name()).ok();
464 writeln!(&mut buffer, "sink id: {}", backend.sink_id()).ok();
465 writeln!(
466 &mut buffer,
467 "output latency: {:.6}",
468 backend.output_latency().unwrap_or(0.)
469 )
470 .ok();
471 }
472 let callback = move |v| match v {
473 EventPayload::Diagnostics(v) => {
474 let s = String::from_utf8(v).unwrap();
475 callback(s);
476 }
477 _ => unreachable!(),
478 };
479
480 self.base().set_event_handler(
481 EventType::Diagnostics,
482 EventHandler::Once(Box::new(callback)),
483 );
484
485 self.base()
486 .send_control_msg(ControlMessage::RunDiagnostics { buffer });
487 }
488
489 pub async fn suspend(&self) {
501 log::debug!("Suspend called");
503
504 let state = self.state();
505 if state == AudioContextState::Closed {
506 log::debug!("Suspend no-op - context is closed");
507 return;
508 }
509
510 if state != AudioContextState::Running && !self.startup_pending.load(Ordering::Acquire) {
511 log::debug!("Suspend no-op - context is not running");
512 return;
513 }
514
515 let (sender, receiver) = oneshot::channel();
517 let notify = OneshotNotify::Async(sender);
518 self.base
519 .suspend_control_msgs(ControlMessage::Suspend { notify });
520
521 log::debug!("Suspending audio graph, waiting for signal..");
524 receiver.await.unwrap();
525
526 log::debug!("Suspended audio graph. Suspending audio stream..");
528 self.backend_manager
529 .lock()
530 .unwrap()
531 .suspend()
532 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
533
534 log::debug!("Suspended audio stream");
535 }
536
537 pub async fn resume(&self) {
547 let (sender, receiver) = oneshot::channel();
548
549 {
550 log::debug!("Resume called, locking backend manager");
552 let backend_manager_guard = self.backend_manager.lock().unwrap();
553
554 if self.state() != AudioContextState::Suspended {
555 log::debug!("Resume no-op - context is not suspended");
556 return;
557 }
558
559 backend_manager_guard
561 .resume()
562 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
563
564 log::debug!("Resumed audio stream, waking audio graph");
566 let notify = OneshotNotify::Async(sender);
567 self.base
568 .resume_control_msgs(ControlMessage::Resume { notify });
569
570 }
572
573 receiver.await.unwrap();
576 log::debug!("Resumed audio graph");
577 }
578
579 pub async fn close(&self) {
588 log::debug!("Close called");
590
591 if self.state() == AudioContextState::Closed {
592 log::debug!("Close no-op - context is already closed");
593 return;
594 }
595
596 self.render_capacity.stop();
598
599 if self.state() == AudioContextState::Running {
600 let (sender, receiver) = oneshot::channel();
602 let notify = OneshotNotify::Async(sender);
603 self.base.send_control_msg(ControlMessage::Close { notify });
604
605 log::debug!("Suspending audio graph, waiting for signal..");
608 receiver.await.unwrap();
609 } else {
610 self.base.set_state(AudioContextState::Closed);
612 }
613
614 log::debug!("Suspended audio graph. Closing audio stream..");
616 self.backend_manager
617 .lock()
618 .unwrap()
619 .close()
620 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
621
622 log::debug!("Closed audio stream");
623 }
624
625 pub fn suspend_sync(&self) {
640 log::debug!("Suspend_sync called, locking backend manager");
642 let backend_manager_guard = self.backend_manager.lock().unwrap();
643
644 let state = self.state();
645 if state == AudioContextState::Closed {
646 log::debug!("Suspend_sync no-op - context is closed");
647 return;
648 }
649
650 if state != AudioContextState::Running && !self.startup_pending.load(Ordering::Acquire) {
651 log::debug!("Suspend_sync no-op - context is not running");
652 return;
653 }
654
655 let (sender, receiver) = crossbeam_channel::bounded(0);
657 let notify = OneshotNotify::Sync(sender);
658 self.base
659 .suspend_control_msgs(ControlMessage::Suspend { notify });
660
661 log::debug!("Suspending audio graph, waiting for signal..");
664 receiver.recv().ok();
665
666 log::debug!("Suspended audio graph. Suspending audio stream..");
668 backend_manager_guard
669 .suspend()
670 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
671
672 log::debug!("Suspended audio stream");
673 }
674
675 pub fn resume_sync(&self) {
688 log::debug!("Resume_sync called, locking backend manager");
690 let backend_manager_guard = self.backend_manager.lock().unwrap();
691
692 if self.state() != AudioContextState::Suspended {
693 log::debug!("Resume no-op - context is not suspended");
694 return;
695 }
696
697 backend_manager_guard
699 .resume()
700 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
701
702 log::debug!("Resumed audio stream, waking audio graph");
704 let (sender, receiver) = crossbeam_channel::bounded(0);
705 let notify = OneshotNotify::Sync(sender);
706 self.base
707 .resume_control_msgs(ControlMessage::Resume { notify });
708
709 receiver.recv().ok();
712 log::debug!("Resumed audio graph");
713 }
714
715 pub fn close_sync(&self) {
727 log::debug!("Close_sync called, locking backend manager");
729 let backend_manager_guard = self.backend_manager.lock().unwrap();
730
731 if self.state() == AudioContextState::Closed {
732 log::debug!("Close no-op - context is already closed");
733 return;
734 }
735
736 self.render_capacity.stop();
738
739 if self.state() == AudioContextState::Running {
741 let (sender, receiver) = crossbeam_channel::bounded(0);
742 let notify = OneshotNotify::Sync(sender);
743 self.base.send_control_msg(ControlMessage::Close { notify });
744
745 log::debug!("Suspending audio graph, waiting for signal..");
748 receiver.recv().ok();
749 } else {
750 self.base.set_state(AudioContextState::Closed);
752 }
753
754 log::debug!("Suspended audio graph. Closing audio stream..");
756 backend_manager_guard
757 .close()
758 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
759
760 log::debug!("Closed audio stream");
761 }
762
763 #[must_use]
766 pub fn create_media_stream_source(
767 &self,
768 media: &MediaStream,
769 ) -> node::MediaStreamAudioSourceNode {
770 let opts = node::MediaStreamAudioSourceOptions {
771 media_stream: media,
772 };
773 node::MediaStreamAudioSourceNode::new(self, opts)
774 }
775
776 #[must_use]
778 pub fn create_media_stream_destination(&self) -> node::MediaStreamAudioDestinationNode {
779 let opts = AudioNodeOptions::default();
780 node::MediaStreamAudioDestinationNode::new(self, opts)
781 }
782
783 #[must_use]
786 pub fn create_media_stream_track_source(
787 &self,
788 media: &MediaStreamTrack,
789 ) -> node::MediaStreamTrackAudioSourceNode {
790 let opts = node::MediaStreamTrackAudioSourceOptions {
791 media_stream_track: media,
792 };
793 node::MediaStreamTrackAudioSourceNode::new(self, opts)
794 }
795
796 #[must_use]
799 pub fn create_media_element_source(
800 &self,
801 media_element: &mut MediaElement,
802 ) -> node::MediaElementAudioSourceNode {
803 let opts = node::MediaElementAudioSourceOptions { media_element };
804 node::MediaElementAudioSourceNode::new(self, opts)
805 }
806}
807
808#[cfg(test)]
809mod tests {
810 use super::*;
811 use futures::executor;
812
813 #[test]
814 fn test_suspend_resume_close() {
815 let options = AudioContextOptions {
816 sink_id: "none".into(),
817 ..AudioContextOptions::default()
818 };
819
820 let context = AudioContext::new(options);
822
823 executor::block_on(context.resume());
825 assert_eq!(context.state(), AudioContextState::Running);
826
827 executor::block_on(context.suspend());
828 assert_eq!(context.state(), AudioContextState::Suspended);
829 let time1 = context.current_time();
830 assert!(time1 >= 0.);
831
832 std::thread::sleep(std::time::Duration::from_millis(1));
834 let time2 = context.current_time();
835 assert_eq!(time1, time2); executor::block_on(context.resume());
838 assert_eq!(context.state(), AudioContextState::Running);
839
840 std::thread::sleep(std::time::Duration::from_millis(1));
842
843 let time3 = context.current_time();
844 assert!(time3 > time2); executor::block_on(context.close());
847 assert_eq!(context.state(), AudioContextState::Closed);
848
849 let time4 = context.current_time();
850
851 std::thread::sleep(std::time::Duration::from_millis(1));
853
854 let time5 = context.current_time();
855 assert_eq!(time5, time4); }
857
858 #[test]
859 fn test_suspend_during_startup() {
860 let options = AudioContextOptions {
861 sink_id: "none".into(),
862 ..AudioContextOptions::default()
863 };
864
865 let context = AudioContext::new(options);
866
867 executor::block_on(context.suspend());
868 assert_eq!(context.state(), AudioContextState::Suspended);
869
870 let time1 = context.current_time();
871 std::thread::sleep(std::time::Duration::from_millis(5));
872 let time2 = context.current_time();
873 assert_eq!(time1, time2);
874 }
875
876 #[test]
877 fn test_suspend_sync_during_startup() {
878 let options = AudioContextOptions {
879 sink_id: "none".into(),
880 ..AudioContextOptions::default()
881 };
882
883 let context = AudioContext::new(options);
884
885 context.suspend_sync();
886 assert_eq!(context.state(), AudioContextState::Suspended);
887
888 let time1 = context.current_time();
889 std::thread::sleep(std::time::Duration::from_millis(5));
890 let time2 = context.current_time();
891 assert_eq!(time1, time2);
892 }
893
894 fn require_send_sync<T: Send + Sync>(_: T) {}
895
896 #[test]
897 fn test_all_futures_thread_safe() {
898 let options = AudioContextOptions {
899 sink_id: "none".into(),
900 ..AudioContextOptions::default()
901 };
902 let context = AudioContext::new(options);
903
904 require_send_sync(context.suspend());
905 require_send_sync(context.resume());
906 require_send_sync(context.close());
907 }
908
909 #[test]
910 #[should_panic]
911 fn test_invalid_sink_id() {
912 let options = AudioContextOptions {
913 sink_id: "invalid".into(),
914 ..AudioContextOptions::default()
915 };
916 let _ = AudioContext::new(options);
917 }
918
919 #[test]
920 fn test_try_new_invalid_sink_id() {
921 let options = AudioContextOptions {
922 sink_id: "invalid".into(),
923 ..AudioContextOptions::default()
924 };
925
926 let error = AudioContext::try_new(options).unwrap_err();
927 assert_eq!(
928 error.to_string(),
929 "NotFoundError - Invalid sinkId: \"invalid\""
930 );
931 }
932}