1use std::error::Error;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5
6use crate::context::{AudioContextState, BaseAudioContext, ConcreteBaseAudioContext};
7use crate::events::{EventDispatch, EventHandler, EventLoop, EventPayload, EventType};
8use crate::io::{self, AudioBackendManager, ControlThreadInit, NoneBackend, RenderThreadInit};
9use crate::media_devices::{enumerate_devices_sync, MediaDeviceInfoKind};
10use crate::media_streams::{MediaStream, MediaStreamTrack};
11use crate::message::{ControlMessage, OneshotNotify};
12use crate::node::{self, AudioNodeOptions};
13use crate::render::graph::Graph;
14use crate::MediaElement;
15use crate::{is_valid_sample_rate, AudioPlaybackStats, AudioRenderCapacity, Event};
16
17use futures_channel::oneshot;
18
19fn is_valid_sink_id(sink_id: &str) -> bool {
23 if sink_id.is_empty() || sink_id == "none" {
24 true
25 } else {
26 enumerate_devices_sync()
27 .into_iter()
28 .filter(|d| d.kind() == MediaDeviceInfoKind::AudioOutput)
29 .any(|d| d.device_id() == sink_id)
30 }
31}
32
33#[derive(Debug)]
34enum AudioContextError {
35 SinkNotFound { sink_id: String },
36 InvalidSampleRate { sample_rate: f32 },
37 Backend { error: io::AudioBackendError },
38}
39
40impl std::fmt::Display for AudioContextError {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 Self::SinkNotFound { sink_id } => {
44 write!(f, "NotFoundError - Invalid sinkId: {sink_id:?}")
45 }
46 Self::InvalidSampleRate { sample_rate } => {
47 write!(
48 f,
49 "NotSupportedError - Invalid sample rate: {sample_rate}, should be in the range [3000.0, 768000.0]"
50 )
51 }
52 Self::Backend { error } => write!(f, "InvalidStateError - {error}"),
53 }
54 }
55}
56
57impl Error for AudioContextError {}
58
59impl From<io::AudioBackendError> for AudioContextError {
60 fn from(error: io::AudioBackendError) -> Self {
61 Self::Backend { error }
62 }
63}
64
65#[derive(Copy, Clone, Debug, Default)]
68pub enum AudioContextLatencyCategory {
69 Balanced,
71 #[default]
73 Interactive,
74 Playback,
78 Custom(f64),
82}
83
84#[derive(Copy, Clone, Debug)]
85#[non_exhaustive]
86#[derive(Default)]
90pub enum AudioContextRenderSizeCategory {
91 #[default]
93 Default,
94}
95
96#[derive(Clone, Debug, Default)]
112pub struct AudioContextOptions {
113 pub latency_hint: AudioContextLatencyCategory,
116
117 pub sample_rate: Option<f32>,
119
120 pub sink_id: String,
125
126 pub render_size_hint: AudioContextRenderSizeCategory,
128}
129
130#[allow(clippy::module_name_repetitions)]
134pub struct AudioContext {
135 base: ConcreteBaseAudioContext,
137 backend_manager: Mutex<Box<dyn AudioBackendManager>>,
139 render_capacity: AudioRenderCapacity,
141 playback_stats: AudioPlaybackStats,
143 startup_pending: std::sync::Arc<AtomicBool>,
145 render_thread_init: RenderThreadInit,
147}
148
149impl std::fmt::Debug for AudioContext {
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 f.debug_struct("AudioContext")
152 .field("sink_id", &self.sink_id())
153 .field("base_latency", &self.base_latency())
154 .field("output_latency", &self.output_latency())
155 .field("base", &self.base())
156 .finish_non_exhaustive()
157 }
158}
159
160impl Drop for AudioContext {
161 fn drop(&mut self) {
162 if self.state() == AudioContextState::Running {
164 let tombstone = Box::new(NoneBackend::void());
165 let original = std::mem::replace(self.backend_manager.get_mut().unwrap(), tombstone);
166 Box::leak(original);
167 }
168 }
169}
170
171impl BaseAudioContext for AudioContext {
172 fn base(&self) -> &ConcreteBaseAudioContext {
173 &self.base
174 }
175}
176
177impl Default for AudioContext {
178 fn default() -> Self {
179 Self::new(AudioContextOptions::default())
180 }
181}
182
183impl AudioContext {
184 #[must_use]
211 pub fn new(options: AudioContextOptions) -> Self {
212 Self::try_new_inner(options).unwrap_or_else(|e| panic!("{e}"))
213 }
214
215 pub fn try_new(options: AudioContextOptions) -> Result<Self, Box<dyn Error>> {
226 Self::try_new_inner(options).map_err(Into::into)
227 }
228
229 fn try_new_inner(options: AudioContextOptions) -> Result<Self, AudioContextError> {
230 if !is_valid_sink_id(&options.sink_id) {
232 return Err(AudioContextError::SinkNotFound {
233 sink_id: options.sink_id,
234 });
235 }
236
237 if let Some(sample_rate) = options.sample_rate {
240 if !is_valid_sample_rate(sample_rate) {
241 return Err(AudioContextError::InvalidSampleRate { sample_rate });
242 }
243 }
244
245 let (control_thread_init, render_thread_init) = io::thread_init();
247 let startup_pending = Arc::clone(&render_thread_init.startup_pending);
248 let backend = io::build_output(options, render_thread_init.clone())?;
249
250 let ControlThreadInit {
251 state,
252 frames_played,
253 stats,
254 ctrl_msg_send,
255 event_send,
256 event_recv,
257 } = control_thread_init;
258
259 let (node_id_producer, node_id_consumer) = llq::Queue::new().split();
261 let graph = Graph::new(node_id_producer);
262 let message = ControlMessage::Startup { graph };
263 ctrl_msg_send.send(message).unwrap();
264
265 let event_loop = EventLoop::new(event_recv);
267
268 let base = ConcreteBaseAudioContext::new(
270 backend.sample_rate(),
271 backend.number_of_channels(),
272 state,
273 frames_played,
274 ctrl_msg_send,
275 event_send,
276 event_loop.clone(),
277 false,
278 node_id_consumer,
279 );
280
281 let render_capacity = AudioRenderCapacity::new(base.clone(), stats.clone());
283 let playback_stats = AudioPlaybackStats::new(base.clone(), stats);
284
285 event_loop.run_in_thread();
289
290 Ok(Self {
291 base,
292 backend_manager: Mutex::new(backend),
293 render_capacity,
294 playback_stats,
295 startup_pending,
296 render_thread_init,
297 })
298 }
299
300 #[allow(clippy::unused_self)]
306 #[must_use]
307 pub fn base_latency(&self) -> f64 {
308 0.
309 }
310
311 #[must_use]
316 #[allow(clippy::missing_panics_doc)]
317 pub fn output_latency(&self) -> f64 {
318 self.try_output_latency()
319 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"))
320 }
321
322 fn try_output_latency(&self) -> Result<f64, Box<dyn Error>> {
328 Ok(self.backend_manager.lock().unwrap().output_latency()?)
329 }
330
331 #[allow(clippy::missing_panics_doc)]
335 pub fn sink_id(&self) -> String {
336 self.backend_manager.lock().unwrap().sink_id().to_owned()
337 }
338
339 #[must_use]
341 pub fn render_capacity(&self) -> AudioRenderCapacity {
342 self.render_capacity.clone()
343 }
344
345 #[must_use]
347 pub fn playback_stats(&self) -> AudioPlaybackStats {
348 self.playback_stats.clone()
349 }
350
351 #[allow(clippy::needless_collect, clippy::missing_panics_doc)]
361 pub fn set_sink_id_sync(&self, sink_id: String) -> Result<(), Box<dyn Error>> {
362 log::debug!("SinkChange requested");
363 if self.sink_id() == sink_id {
364 log::debug!("SinkChange: no-op");
365 return Ok(()); }
367
368 if !is_valid_sink_id(&sink_id) {
369 Err(format!("NotFoundError: invalid sinkId {sink_id}"))?;
370 };
371
372 log::debug!("SinkChange: locking backend manager");
373 let mut backend_manager_guard = self.backend_manager.lock().unwrap();
374 let original_state = self.state();
375 if original_state == AudioContextState::Closed {
376 log::debug!("SinkChange: context is closed");
377 return Ok(());
378 }
379
380 log::debug!("SinkChange: locking message channel");
382 let ctrl_msg_send = self.base.lock_control_msg_sender();
383
384 let mut pending_msgs: Vec<_> = self.render_thread_init.ctrl_msg_recv.try_iter().collect();
386
387 let graph = if matches!(pending_msgs.first(), Some(ControlMessage::Startup { .. })) {
389 log::debug!("SinkChange: recover unstarted graph");
392
393 let msg = pending_msgs.remove(0);
394 match msg {
395 ControlMessage::Startup { graph } => graph,
396 _ => unreachable!(),
397 }
398 } else {
399 log::debug!("SinkChange: recover graph from render thread");
401
402 let (graph_send, graph_recv) = crossbeam_channel::bounded(1);
403 let message = ControlMessage::CloseAndRecycle { sender: graph_send };
404 ctrl_msg_send.send(message).unwrap();
405 if original_state == AudioContextState::Suspended {
406 backend_manager_guard.resume()?;
409 }
410 graph_recv.recv().unwrap()
411 };
412
413 log::debug!("SinkChange: closing audio stream");
414 backend_manager_guard.close()?;
415
416 let options = AudioContextOptions {
418 sample_rate: Some(self.sample_rate()),
419 latency_hint: AudioContextLatencyCategory::default(), sink_id,
421 render_size_hint: AudioContextRenderSizeCategory::default(), };
423 log::debug!("SinkChange: starting audio stream");
424 *backend_manager_guard = io::build_output(options, self.render_thread_init.clone())?;
425
426 if original_state == AudioContextState::Suspended {
428 log::debug!("SinkChange: suspending audio stream");
429 backend_manager_guard.suspend()?;
430 }
431
432 let message = ControlMessage::Startup { graph };
434 ctrl_msg_send.send(message).unwrap();
435
436 pending_msgs
438 .into_iter()
439 .for_each(|m| self.base().send_control_msg(m));
440
441 drop(backend_manager_guard);
443
444 let _ = self.base.send_event(EventDispatch::sink_change());
446
447 log::debug!("SinkChange: done");
448 Ok(())
449 }
450
451 pub fn set_onsinkchange<F: FnMut(Event) + Send + 'static>(&self, mut callback: F) {
456 let callback = move |_| {
457 callback(Event {
458 type_: "sinkchange",
459 })
460 };
461
462 self.base().set_event_handler(
463 EventType::SinkChange,
464 EventHandler::Multiple(Box::new(callback)),
465 );
466 }
467
468 pub fn clear_onsinkchange(&self) {
470 self.base().clear_event_handler(EventType::SinkChange);
471 }
472
473 #[allow(clippy::missing_panics_doc)]
474 #[doc(hidden)] pub fn run_diagnostics<F: Fn(String) + Send + 'static>(&self, callback: F) {
476 let mut buffer = Vec::with_capacity(32 * 1024);
477 {
478 let backend = self.backend_manager.lock().unwrap();
479 use std::io::Write;
480 writeln!(&mut buffer, "backend: {}", backend.name()).ok();
481 writeln!(&mut buffer, "sink id: {}", backend.sink_id()).ok();
482 writeln!(
483 &mut buffer,
484 "output latency: {:.6}",
485 backend.output_latency().unwrap_or(0.)
486 )
487 .ok();
488 }
489 let callback = move |v| match v {
490 EventPayload::Diagnostics(v) => {
491 let s = String::from_utf8(v).unwrap();
492 callback(s);
493 }
494 _ => unreachable!(),
495 };
496
497 self.base().set_event_handler(
498 EventType::Diagnostics,
499 EventHandler::Once(Box::new(callback)),
500 );
501
502 self.base()
503 .send_control_msg(ControlMessage::RunDiagnostics { buffer });
504 }
505
506 pub async fn suspend(&self) {
518 log::debug!("Suspend called");
520
521 let state = self.state();
522 if state == AudioContextState::Closed {
523 log::debug!("Suspend no-op - context is closed");
524 return;
525 }
526
527 if state != AudioContextState::Running && !self.startup_pending.load(Ordering::Acquire) {
528 log::debug!("Suspend no-op - context is not running");
529 return;
530 }
531
532 let (sender, receiver) = oneshot::channel();
534 let notify = OneshotNotify::Async(sender);
535 self.base
536 .suspend_control_msgs(ControlMessage::Suspend { notify });
537
538 log::debug!("Suspending audio graph, waiting for signal..");
541 receiver.await.unwrap();
542
543 log::debug!("Suspended audio graph. Suspending audio stream..");
545 self.backend_manager
546 .lock()
547 .unwrap()
548 .suspend()
549 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
550
551 log::debug!("Suspended audio stream");
552 }
553
554 pub async fn resume(&self) {
564 let (sender, receiver) = oneshot::channel();
565
566 {
567 log::debug!("Resume called, locking backend manager");
569 let backend_manager_guard = self.backend_manager.lock().unwrap();
570
571 if self.state() != AudioContextState::Suspended {
572 log::debug!("Resume no-op - context is not suspended");
573 return;
574 }
575
576 backend_manager_guard
578 .resume()
579 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
580
581 log::debug!("Resumed audio stream, waking audio graph");
583 let notify = OneshotNotify::Async(sender);
584 self.base
585 .resume_control_msgs(ControlMessage::Resume { notify });
586
587 }
589
590 receiver.await.unwrap();
593 log::debug!("Resumed audio graph");
594 }
595
596 pub async fn close(&self) {
605 log::debug!("Close called");
607
608 if self.state() == AudioContextState::Closed {
609 log::debug!("Close no-op - context is already closed");
610 return;
611 }
612
613 self.render_capacity.stop();
615
616 if self.state() == AudioContextState::Running {
617 let (sender, receiver) = oneshot::channel();
619 let notify = OneshotNotify::Async(sender);
620 self.base.send_control_msg(ControlMessage::Close { notify });
621
622 log::debug!("Suspending audio graph, waiting for signal..");
625 receiver.await.unwrap();
626 } else {
627 self.base.set_state(AudioContextState::Closed);
629 }
630
631 log::debug!("Suspended audio graph. Closing audio stream..");
633 self.backend_manager
634 .lock()
635 .unwrap()
636 .close()
637 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
638
639 log::debug!("Closed audio stream");
640 }
641
642 pub fn suspend_sync(&self) {
657 log::debug!("Suspend_sync called, locking backend manager");
659 let backend_manager_guard = self.backend_manager.lock().unwrap();
660
661 let state = self.state();
662 if state == AudioContextState::Closed {
663 log::debug!("Suspend_sync no-op - context is closed");
664 return;
665 }
666
667 if state != AudioContextState::Running && !self.startup_pending.load(Ordering::Acquire) {
668 log::debug!("Suspend_sync no-op - context is not running");
669 return;
670 }
671
672 let (sender, receiver) = crossbeam_channel::bounded(0);
674 let notify = OneshotNotify::Sync(sender);
675 self.base
676 .suspend_control_msgs(ControlMessage::Suspend { notify });
677
678 log::debug!("Suspending audio graph, waiting for signal..");
681 receiver.recv().ok();
682
683 log::debug!("Suspended audio graph. Suspending audio stream..");
685 backend_manager_guard
686 .suspend()
687 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
688
689 log::debug!("Suspended audio stream");
690 }
691
692 pub fn resume_sync(&self) {
705 log::debug!("Resume_sync called, locking backend manager");
707 let backend_manager_guard = self.backend_manager.lock().unwrap();
708
709 if self.state() != AudioContextState::Suspended {
710 log::debug!("Resume no-op - context is not suspended");
711 return;
712 }
713
714 backend_manager_guard
716 .resume()
717 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
718
719 log::debug!("Resumed audio stream, waking audio graph");
721 let (sender, receiver) = crossbeam_channel::bounded(0);
722 let notify = OneshotNotify::Sync(sender);
723 self.base
724 .resume_control_msgs(ControlMessage::Resume { notify });
725
726 receiver.recv().ok();
729 log::debug!("Resumed audio graph");
730 }
731
732 pub fn close_sync(&self) {
744 log::debug!("Close_sync called, locking backend manager");
746 let backend_manager_guard = self.backend_manager.lock().unwrap();
747
748 if self.state() == AudioContextState::Closed {
749 log::debug!("Close no-op - context is already closed");
750 return;
751 }
752
753 self.render_capacity.stop();
755
756 if self.state() == AudioContextState::Running {
758 let (sender, receiver) = crossbeam_channel::bounded(0);
759 let notify = OneshotNotify::Sync(sender);
760 self.base.send_control_msg(ControlMessage::Close { notify });
761
762 log::debug!("Suspending audio graph, waiting for signal..");
765 receiver.recv().ok();
766 } else {
767 self.base.set_state(AudioContextState::Closed);
769 }
770
771 log::debug!("Suspended audio graph. Closing audio stream..");
773 backend_manager_guard
774 .close()
775 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
776
777 log::debug!("Closed audio stream");
778 }
779
780 #[must_use]
783 pub fn create_media_stream_source(
784 &self,
785 media: &MediaStream,
786 ) -> node::MediaStreamAudioSourceNode {
787 let opts = node::MediaStreamAudioSourceOptions {
788 media_stream: media,
789 };
790 node::MediaStreamAudioSourceNode::new(self, opts)
791 }
792
793 #[must_use]
795 pub fn create_media_stream_destination(&self) -> node::MediaStreamAudioDestinationNode {
796 let opts = AudioNodeOptions::default();
797 node::MediaStreamAudioDestinationNode::new(self, opts)
798 }
799
800 #[must_use]
803 pub fn create_media_stream_track_source(
804 &self,
805 media: &MediaStreamTrack,
806 ) -> node::MediaStreamTrackAudioSourceNode {
807 let opts = node::MediaStreamTrackAudioSourceOptions {
808 media_stream_track: media,
809 };
810 node::MediaStreamTrackAudioSourceNode::new(self, opts)
811 }
812
813 #[must_use]
816 pub fn create_media_element_source(
817 &self,
818 media_element: &mut MediaElement,
819 ) -> node::MediaElementAudioSourceNode {
820 let opts = node::MediaElementAudioSourceOptions { media_element };
821 node::MediaElementAudioSourceNode::new(self, opts)
822 }
823}
824
825#[cfg(test)]
826mod tests {
827 use super::*;
828 use futures::executor;
829
830 #[test]
831 fn test_suspend_resume_close() {
832 let options = AudioContextOptions {
833 sink_id: "none".into(),
834 ..AudioContextOptions::default()
835 };
836
837 let context = AudioContext::new(options);
839
840 executor::block_on(context.resume());
842 assert_eq!(context.state(), AudioContextState::Running);
843
844 executor::block_on(context.suspend());
845 assert_eq!(context.state(), AudioContextState::Suspended);
846 let time1 = context.current_time();
847 assert!(time1 >= 0.);
848
849 std::thread::sleep(std::time::Duration::from_millis(1));
851 let time2 = context.current_time();
852 assert_eq!(time1, time2); executor::block_on(context.resume());
855 assert_eq!(context.state(), AudioContextState::Running);
856
857 std::thread::sleep(std::time::Duration::from_millis(1));
859
860 let time3 = context.current_time();
861 assert!(time3 > time2); executor::block_on(context.close());
864 assert_eq!(context.state(), AudioContextState::Closed);
865
866 let time4 = context.current_time();
867
868 std::thread::sleep(std::time::Duration::from_millis(1));
870
871 let time5 = context.current_time();
872 assert_eq!(time5, time4); }
874
875 #[test]
876 fn test_suspend_during_startup() {
877 let options = AudioContextOptions {
878 sink_id: "none".into(),
879 ..AudioContextOptions::default()
880 };
881
882 let context = AudioContext::new(options);
883
884 executor::block_on(context.suspend());
885 assert_eq!(context.state(), AudioContextState::Suspended);
886
887 let time1 = context.current_time();
888 std::thread::sleep(std::time::Duration::from_millis(5));
889 let time2 = context.current_time();
890 assert_eq!(time1, time2);
891 }
892
893 #[test]
894 fn test_suspend_sync_during_startup() {
895 let options = AudioContextOptions {
896 sink_id: "none".into(),
897 ..AudioContextOptions::default()
898 };
899
900 let context = AudioContext::new(options);
901
902 context.suspend_sync();
903 assert_eq!(context.state(), AudioContextState::Suspended);
904
905 let time1 = context.current_time();
906 std::thread::sleep(std::time::Duration::from_millis(5));
907 let time2 = context.current_time();
908 assert_eq!(time1, time2);
909 }
910
911 fn require_send_sync<T: Send + Sync>(_: T) {}
912
913 #[test]
914 fn test_all_futures_thread_safe() {
915 let options = AudioContextOptions {
916 sink_id: "none".into(),
917 ..AudioContextOptions::default()
918 };
919 let context = AudioContext::new(options);
920
921 require_send_sync(context.suspend());
922 require_send_sync(context.resume());
923 require_send_sync(context.close());
924 }
925
926 #[test]
927 fn test_try_new_invalid_sample_rate() {
928 let options = AudioContextOptions {
929 sample_rate: Some(0.),
930 sink_id: "none".into(),
931 ..AudioContextOptions::default()
932 };
933
934 let result = AudioContext::try_new(options);
935 assert!(result.is_err());
936 let error_msg = result.unwrap_err().to_string();
937 assert!(error_msg.contains("Invalid sample rate"));
938 }
939
940 #[test]
941 #[should_panic]
942 fn test_invalid_sink_id() {
943 let options = AudioContextOptions {
944 sink_id: "invalid".into(),
945 ..AudioContextOptions::default()
946 };
947 let _ = AudioContext::new(options);
948 }
949
950 #[test]
951 fn test_try_new_invalid_sink_id() {
952 let options = AudioContextOptions {
953 sink_id: "invalid".into(),
954 ..AudioContextOptions::default()
955 };
956
957 let error = AudioContext::try_new(options).unwrap_err();
958 assert_eq!(
959 error.to_string(),
960 "NotFoundError - Invalid sinkId: \"invalid\""
961 );
962 }
963}