1use std::error::Error;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5
6#[cfg(feature = "diagnostics")]
7use crate::context::{AudioBackendDiagnostics, AudioContextDiagnostics};
8use crate::context::{AudioContextState, BaseAudioContext, ConcreteBaseAudioContext};
9#[cfg(feature = "diagnostics")]
10use crate::events::EventPayload;
11use crate::events::{EventDispatch, EventHandler, EventLoop, EventType};
12use crate::io::{self, AudioBackendManager, ControlThreadInit, NoneBackend, RenderThreadInit};
13use crate::media_devices::{enumerate_devices_sync, MediaDeviceInfoKind};
14use crate::media_streams::{MediaStream, MediaStreamTrack};
15use crate::message::{ControlMessage, OneshotNotify};
16use crate::node::{self, AudioNodeOptions};
17use crate::render::graph::Graph;
18use crate::MediaElement;
19use crate::{is_valid_sample_rate, AudioPlaybackStats, AudioRenderCapacity, Event};
20
21use futures_channel::oneshot;
22
23fn is_valid_sink_id(sink_id: &str) -> bool {
27 if sink_id.is_empty() || sink_id == "none" {
28 true
29 } else {
30 enumerate_devices_sync()
31 .into_iter()
32 .filter(|d| d.kind() == MediaDeviceInfoKind::AudioOutput)
33 .any(|d| d.device_id() == sink_id)
34 }
35}
36
37#[derive(Debug)]
38enum AudioContextError {
39 SinkNotFound { sink_id: String },
40 InvalidSampleRate { sample_rate: f32 },
41 Backend { error: io::AudioBackendError },
42}
43
44impl std::fmt::Display for AudioContextError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 Self::SinkNotFound { sink_id } => {
48 write!(f, "NotFoundError - Invalid sinkId: {sink_id:?}")
49 }
50 Self::InvalidSampleRate { sample_rate } => {
51 write!(
52 f,
53 "NotSupportedError - Invalid sample rate: {sample_rate}, should be in the range [3000.0, 768000.0]"
54 )
55 }
56 Self::Backend { error } => write!(f, "InvalidStateError - {error}"),
57 }
58 }
59}
60
61impl Error for AudioContextError {}
62
63impl From<io::AudioBackendError> for AudioContextError {
64 fn from(error: io::AudioBackendError) -> Self {
65 Self::Backend { error }
66 }
67}
68
69#[derive(Copy, Clone, Debug, Default)]
72pub enum AudioContextLatencyCategory {
73 Balanced,
75 #[default]
77 Interactive,
78 Playback,
82 Custom(f64),
86}
87
88#[derive(Copy, Clone, Debug)]
89#[non_exhaustive]
90#[derive(Default)]
94pub enum AudioContextRenderSizeCategory {
95 #[default]
97 Default,
98}
99
100#[derive(Clone, Debug, Default)]
116pub struct AudioContextOptions {
117 pub latency_hint: AudioContextLatencyCategory,
120
121 pub sample_rate: Option<f32>,
123
124 pub sink_id: String,
129
130 pub render_size_hint: AudioContextRenderSizeCategory,
132}
133
134#[allow(clippy::module_name_repetitions)]
138pub struct AudioContext {
139 base: ConcreteBaseAudioContext,
141 backend_manager: Mutex<Box<dyn AudioBackendManager>>,
143 render_capacity: AudioRenderCapacity,
145 playback_stats: AudioPlaybackStats,
147 startup_pending: std::sync::Arc<AtomicBool>,
149 render_thread_init: RenderThreadInit,
151}
152
153impl std::fmt::Debug for AudioContext {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 f.debug_struct("AudioContext")
156 .field("sink_id", &self.sink_id())
157 .field("base_latency", &self.base_latency())
158 .field("output_latency", &self.output_latency())
159 .field("base", &self.base())
160 .finish_non_exhaustive()
161 }
162}
163
164impl Drop for AudioContext {
165 fn drop(&mut self) {
166 if self.state() == AudioContextState::Running {
168 let tombstone = Box::new(NoneBackend::void());
169 let original = std::mem::replace(self.backend_manager.get_mut().unwrap(), tombstone);
170 Box::leak(original);
171 }
172 }
173}
174
175impl BaseAudioContext for AudioContext {
176 fn base(&self) -> &ConcreteBaseAudioContext {
177 &self.base
178 }
179}
180
181impl Default for AudioContext {
182 fn default() -> Self {
183 Self::new(AudioContextOptions::default())
184 }
185}
186
187impl AudioContext {
188 #[must_use]
215 pub fn new(options: AudioContextOptions) -> Self {
216 Self::try_new_inner(options).unwrap_or_else(|e| panic!("{e}"))
217 }
218
219 pub fn try_new(options: AudioContextOptions) -> Result<Self, Box<dyn Error>> {
230 Self::try_new_inner(options).map_err(Into::into)
231 }
232
233 fn try_new_inner(options: AudioContextOptions) -> Result<Self, AudioContextError> {
234 if !is_valid_sink_id(&options.sink_id) {
236 return Err(AudioContextError::SinkNotFound {
237 sink_id: options.sink_id,
238 });
239 }
240
241 if let Some(sample_rate) = options.sample_rate {
244 if !is_valid_sample_rate(sample_rate) {
245 return Err(AudioContextError::InvalidSampleRate { sample_rate });
246 }
247 }
248
249 let (control_thread_init, render_thread_init) = io::thread_init();
251 let startup_pending = Arc::clone(&render_thread_init.startup_pending);
252 let backend = io::build_output(options, render_thread_init.clone())?;
253
254 let ControlThreadInit {
255 state,
256 frames_played,
257 stats,
258 ctrl_msg_send,
259 event_send,
260 event_recv,
261 } = control_thread_init;
262
263 let (node_id_producer, node_id_consumer) = llq::Queue::new().split();
265 let graph = Graph::new(node_id_producer);
266 let message = ControlMessage::Startup { graph };
267 ctrl_msg_send.send(message).unwrap();
268
269 let event_loop = EventLoop::new(event_recv);
271
272 let base = ConcreteBaseAudioContext::new(
274 backend.sample_rate(),
275 backend.number_of_channels(),
276 state,
277 frames_played,
278 ctrl_msg_send,
279 event_send,
280 event_loop.clone(),
281 false,
282 node_id_consumer,
283 );
284
285 let render_capacity = AudioRenderCapacity::new(base.clone(), stats.clone());
287 let playback_stats = AudioPlaybackStats::new(base.clone(), stats);
288
289 event_loop.run_in_thread();
293
294 Ok(Self {
295 base,
296 backend_manager: Mutex::new(backend),
297 render_capacity,
298 playback_stats,
299 startup_pending,
300 render_thread_init,
301 })
302 }
303
304 #[allow(clippy::unused_self)]
310 #[must_use]
311 pub fn base_latency(&self) -> f64 {
312 0.
313 }
314
315 #[must_use]
320 #[allow(clippy::missing_panics_doc)]
321 pub fn output_latency(&self) -> f64 {
322 self.try_output_latency()
323 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"))
324 }
325
326 fn try_output_latency(&self) -> Result<f64, Box<dyn Error>> {
332 Ok(self.backend_manager.lock().unwrap().output_latency()?)
333 }
334
335 #[allow(clippy::missing_panics_doc)]
339 pub fn sink_id(&self) -> String {
340 self.backend_manager.lock().unwrap().sink_id().to_owned()
341 }
342
343 #[must_use]
345 pub fn render_capacity(&self) -> AudioRenderCapacity {
346 self.render_capacity.clone()
347 }
348
349 #[must_use]
351 pub fn playback_stats(&self) -> AudioPlaybackStats {
352 self.playback_stats.clone()
353 }
354
355 #[allow(clippy::needless_collect, clippy::missing_panics_doc)]
365 pub fn set_sink_id_sync(&self, sink_id: String) -> Result<(), Box<dyn Error>> {
366 log::debug!("SinkChange requested");
367 if self.sink_id() == sink_id {
368 log::debug!("SinkChange: no-op");
369 return Ok(()); }
371
372 if !is_valid_sink_id(&sink_id) {
373 Err(format!("NotFoundError: invalid sinkId {sink_id}"))?;
374 };
375
376 log::debug!("SinkChange: locking backend manager");
377 let mut backend_manager_guard = self.backend_manager.lock().unwrap();
378 let original_state = self.state();
379 if original_state == AudioContextState::Closed {
380 log::debug!("SinkChange: context is closed");
381 return Ok(());
382 }
383
384 log::debug!("SinkChange: locking message channel");
386 let ctrl_msg_send = self.base.lock_control_msg_sender();
387
388 let mut pending_msgs: Vec<_> = self.render_thread_init.ctrl_msg_recv.try_iter().collect();
390
391 let graph = if matches!(pending_msgs.first(), Some(ControlMessage::Startup { .. })) {
393 log::debug!("SinkChange: recover unstarted graph");
396
397 let msg = pending_msgs.remove(0);
398 match msg {
399 ControlMessage::Startup { graph } => graph,
400 _ => unreachable!(),
401 }
402 } else {
403 log::debug!("SinkChange: recover graph from render thread");
405
406 let (graph_send, graph_recv) = crossbeam_channel::bounded(1);
407 let message = ControlMessage::CloseAndRecycle { sender: graph_send };
408 ctrl_msg_send.send(message).unwrap();
409 if original_state == AudioContextState::Suspended {
410 backend_manager_guard.resume()?;
413 }
414 graph_recv.recv().unwrap()
415 };
416
417 log::debug!("SinkChange: closing audio stream");
418 backend_manager_guard.close()?;
419
420 let options = AudioContextOptions {
422 sample_rate: Some(self.sample_rate()),
423 latency_hint: AudioContextLatencyCategory::default(), sink_id,
425 render_size_hint: AudioContextRenderSizeCategory::default(), };
427 log::debug!("SinkChange: starting audio stream");
428 *backend_manager_guard = io::build_output(options, self.render_thread_init.clone())?;
429
430 if original_state == AudioContextState::Suspended {
432 log::debug!("SinkChange: suspending audio stream");
433 backend_manager_guard.suspend()?;
434 }
435
436 let message = ControlMessage::Startup { graph };
438 ctrl_msg_send.send(message).unwrap();
439
440 pending_msgs
442 .into_iter()
443 .for_each(|m| self.base().send_control_msg(m));
444
445 drop(backend_manager_guard);
447
448 let _ = self.base.send_event(EventDispatch::sink_change());
450
451 log::debug!("SinkChange: done");
452 Ok(())
453 }
454
455 pub fn set_onsinkchange<F: FnMut(Event) + Send + 'static>(&self, mut callback: F) {
460 let callback = move |_| {
461 callback(Event {
462 type_: "sinkchange",
463 })
464 };
465
466 self.base().set_event_handler(
467 EventType::SinkChange,
468 EventHandler::Multiple(Box::new(callback)),
469 );
470 }
471
472 pub fn clear_onsinkchange(&self) {
474 self.base().clear_event_handler(EventType::SinkChange);
475 }
476
477 #[cfg(feature = "diagnostics")]
485 #[allow(clippy::missing_panics_doc)]
486 pub fn run_diagnostics<F: Fn(AudioContextDiagnostics) + Send + 'static>(&self, callback: F) {
487 let backend = {
488 let backend = self.backend_manager.lock().unwrap();
489 AudioBackendDiagnostics {
490 name: backend.name().to_string(),
491 sink_id: backend.sink_id().to_string(),
492 output_latency: backend.output_latency().ok(),
493 }
494 };
495
496 let callback = move |v| match v {
497 EventPayload::Diagnostics(v) => {
498 callback(v);
499 }
500 _ => unreachable!(),
501 };
502
503 self.base().set_event_handler(
504 EventType::Diagnostics,
505 EventHandler::Once(Box::new(callback)),
506 );
507
508 self.base()
509 .send_control_msg(ControlMessage::RunDiagnostics { backend });
510 }
511
512 pub async fn suspend(&self) {
524 log::debug!("Suspend called");
526
527 let state = self.state();
528 if state == AudioContextState::Closed {
529 log::debug!("Suspend no-op - context is closed");
530 return;
531 }
532
533 if state != AudioContextState::Running && !self.startup_pending.load(Ordering::Acquire) {
534 log::debug!("Suspend no-op - context is not running");
535 return;
536 }
537
538 let (sender, receiver) = oneshot::channel();
540 let notify = OneshotNotify::Async(sender);
541 self.base
542 .suspend_control_msgs(ControlMessage::Suspend { notify });
543
544 log::debug!("Suspending audio graph, waiting for signal..");
547 receiver.await.unwrap();
548
549 log::debug!("Suspended audio graph. Suspending audio stream..");
551 self.backend_manager
552 .lock()
553 .unwrap()
554 .suspend()
555 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
556
557 log::debug!("Suspended audio stream");
558 }
559
560 pub async fn resume(&self) {
570 let (sender, receiver) = oneshot::channel();
571
572 {
573 log::debug!("Resume called, locking backend manager");
575 let backend_manager_guard = self.backend_manager.lock().unwrap();
576
577 if self.state() != AudioContextState::Suspended {
578 log::debug!("Resume no-op - context is not suspended");
579 return;
580 }
581
582 backend_manager_guard
584 .resume()
585 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
586
587 log::debug!("Resumed audio stream, waking audio graph");
589 let notify = OneshotNotify::Async(sender);
590 self.base
591 .resume_control_msgs(ControlMessage::Resume { notify });
592
593 }
595
596 receiver.await.unwrap();
599 log::debug!("Resumed audio graph");
600 }
601
602 pub async fn close(&self) {
611 log::debug!("Close called");
613
614 if self.state() == AudioContextState::Closed {
615 log::debug!("Close no-op - context is already closed");
616 return;
617 }
618
619 self.render_capacity.stop();
621
622 if self.state() == AudioContextState::Running {
623 let (sender, receiver) = oneshot::channel();
625 let notify = OneshotNotify::Async(sender);
626 self.base.send_control_msg(ControlMessage::Close { notify });
627
628 log::debug!("Suspending audio graph, waiting for signal..");
631 receiver.await.unwrap();
632 } else {
633 self.base.set_state(AudioContextState::Closed);
635 }
636
637 log::debug!("Suspended audio graph. Closing audio stream..");
639 self.backend_manager
640 .lock()
641 .unwrap()
642 .close()
643 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
644
645 log::debug!("Closed audio stream");
646 }
647
648 pub fn suspend_sync(&self) {
663 log::debug!("Suspend_sync called, locking backend manager");
665 let backend_manager_guard = self.backend_manager.lock().unwrap();
666
667 let state = self.state();
668 if state == AudioContextState::Closed {
669 log::debug!("Suspend_sync no-op - context is closed");
670 return;
671 }
672
673 if state != AudioContextState::Running && !self.startup_pending.load(Ordering::Acquire) {
674 log::debug!("Suspend_sync no-op - context is not running");
675 return;
676 }
677
678 let (sender, receiver) = crossbeam_channel::bounded(0);
680 let notify = OneshotNotify::Sync(sender);
681 self.base
682 .suspend_control_msgs(ControlMessage::Suspend { notify });
683
684 log::debug!("Suspending audio graph, waiting for signal..");
687 receiver.recv().ok();
688
689 log::debug!("Suspended audio graph. Suspending audio stream..");
691 backend_manager_guard
692 .suspend()
693 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
694
695 log::debug!("Suspended audio stream");
696 }
697
698 pub fn resume_sync(&self) {
711 log::debug!("Resume_sync called, locking backend manager");
713 let backend_manager_guard = self.backend_manager.lock().unwrap();
714
715 if self.state() != AudioContextState::Suspended {
716 log::debug!("Resume no-op - context is not suspended");
717 return;
718 }
719
720 backend_manager_guard
722 .resume()
723 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
724
725 log::debug!("Resumed audio stream, waking audio graph");
727 let (sender, receiver) = crossbeam_channel::bounded(0);
728 let notify = OneshotNotify::Sync(sender);
729 self.base
730 .resume_control_msgs(ControlMessage::Resume { notify });
731
732 receiver.recv().ok();
735 log::debug!("Resumed audio graph");
736 }
737
738 pub fn close_sync(&self) {
750 log::debug!("Close_sync called, locking backend manager");
752 let backend_manager_guard = self.backend_manager.lock().unwrap();
753
754 if self.state() == AudioContextState::Closed {
755 log::debug!("Close no-op - context is already closed");
756 return;
757 }
758
759 self.render_capacity.stop();
761
762 if self.state() == AudioContextState::Running {
764 let (sender, receiver) = crossbeam_channel::bounded(0);
765 let notify = OneshotNotify::Sync(sender);
766 self.base.send_control_msg(ControlMessage::Close { notify });
767
768 log::debug!("Suspending audio graph, waiting for signal..");
771 receiver.recv().ok();
772 } else {
773 self.base.set_state(AudioContextState::Closed);
775 }
776
777 log::debug!("Suspended audio graph. Closing audio stream..");
779 backend_manager_guard
780 .close()
781 .unwrap_or_else(|e| panic!("InvalidStateError - {e}"));
782
783 log::debug!("Closed audio stream");
784 }
785
786 #[must_use]
789 pub fn create_media_stream_source(
790 &self,
791 media: &MediaStream,
792 ) -> node::MediaStreamAudioSourceNode {
793 let opts = node::MediaStreamAudioSourceOptions {
794 media_stream: media,
795 };
796 node::MediaStreamAudioSourceNode::new(self, opts)
797 }
798
799 #[must_use]
801 pub fn create_media_stream_destination(&self) -> node::MediaStreamAudioDestinationNode {
802 let opts = AudioNodeOptions::default();
803 node::MediaStreamAudioDestinationNode::new(self, opts)
804 }
805
806 #[must_use]
809 pub fn create_media_stream_track_source(
810 &self,
811 media: &MediaStreamTrack,
812 ) -> node::MediaStreamTrackAudioSourceNode {
813 let opts = node::MediaStreamTrackAudioSourceOptions {
814 media_stream_track: media,
815 };
816 node::MediaStreamTrackAudioSourceNode::new(self, opts)
817 }
818
819 #[must_use]
822 pub fn create_media_element_source(
823 &self,
824 media_element: &mut MediaElement,
825 ) -> node::MediaElementAudioSourceNode {
826 let opts = node::MediaElementAudioSourceOptions { media_element };
827 node::MediaElementAudioSourceNode::new(self, opts)
828 }
829}
830
831#[cfg(test)]
832mod tests {
833 use super::*;
834 #[cfg(feature = "diagnostics")]
835 use crate::context::DESTINATION_NODE_ID;
836 use futures::executor;
837
838 #[test]
839 fn test_suspend_resume_close() {
840 let options = AudioContextOptions {
841 sink_id: "none".into(),
842 ..AudioContextOptions::default()
843 };
844
845 let context = AudioContext::new(options);
847
848 executor::block_on(context.resume());
850 assert_eq!(context.state(), AudioContextState::Running);
851
852 executor::block_on(context.suspend());
853 assert_eq!(context.state(), AudioContextState::Suspended);
854 let time1 = context.current_time();
855 assert!(time1 >= 0.);
856
857 std::thread::sleep(std::time::Duration::from_millis(1));
859 let time2 = context.current_time();
860 assert_eq!(time1, time2); executor::block_on(context.resume());
863 assert_eq!(context.state(), AudioContextState::Running);
864
865 std::thread::sleep(std::time::Duration::from_millis(1));
867
868 let time3 = context.current_time();
869 assert!(time3 > time2); executor::block_on(context.close());
872 assert_eq!(context.state(), AudioContextState::Closed);
873
874 let time4 = context.current_time();
875
876 std::thread::sleep(std::time::Duration::from_millis(1));
878
879 let time5 = context.current_time();
880 assert_eq!(time5, time4); }
882
883 #[test]
884 fn test_suspend_during_startup() {
885 let options = AudioContextOptions {
886 sink_id: "none".into(),
887 ..AudioContextOptions::default()
888 };
889
890 let context = AudioContext::new(options);
891
892 executor::block_on(context.suspend());
893 assert_eq!(context.state(), AudioContextState::Suspended);
894
895 let time1 = context.current_time();
896 std::thread::sleep(std::time::Duration::from_millis(5));
897 let time2 = context.current_time();
898 assert_eq!(time1, time2);
899 }
900
901 #[test]
902 fn test_suspend_sync_during_startup() {
903 let options = AudioContextOptions {
904 sink_id: "none".into(),
905 ..AudioContextOptions::default()
906 };
907
908 let context = AudioContext::new(options);
909
910 context.suspend_sync();
911 assert_eq!(context.state(), AudioContextState::Suspended);
912
913 let time1 = context.current_time();
914 std::thread::sleep(std::time::Duration::from_millis(5));
915 let time2 = context.current_time();
916 assert_eq!(time1, time2);
917 }
918
919 fn require_send_sync<T: Send + Sync>(_: T) {}
920
921 #[test]
922 fn test_all_futures_thread_safe() {
923 let options = AudioContextOptions {
924 sink_id: "none".into(),
925 ..AudioContextOptions::default()
926 };
927 let context = AudioContext::new(options);
928
929 require_send_sync(context.suspend());
930 require_send_sync(context.resume());
931 require_send_sync(context.close());
932 }
933
934 #[test]
935 fn test_try_new_invalid_sample_rate() {
936 let options = AudioContextOptions {
937 sample_rate: Some(0.),
938 sink_id: "none".into(),
939 ..AudioContextOptions::default()
940 };
941
942 let result = AudioContext::try_new(options);
943 assert!(result.is_err());
944 let error_msg = result.unwrap_err().to_string();
945 assert!(error_msg.contains("Invalid sample rate"));
946 }
947
948 #[test]
949 #[should_panic]
950 fn test_invalid_sink_id() {
951 let options = AudioContextOptions {
952 sink_id: "invalid".into(),
953 ..AudioContextOptions::default()
954 };
955 let _ = AudioContext::new(options);
956 }
957
958 #[test]
959 fn test_try_new_invalid_sink_id() {
960 let options = AudioContextOptions {
961 sink_id: "invalid".into(),
962 ..AudioContextOptions::default()
963 };
964
965 let error = AudioContext::try_new(options).unwrap_err();
966 assert_eq!(
967 error.to_string(),
968 "NotFoundError - Invalid sinkId: \"invalid\""
969 );
970 }
971
972 #[cfg(feature = "diagnostics")]
973 #[test]
974 fn test_run_diagnostics_returns_structured_output() {
975 let options = AudioContextOptions {
976 sink_id: "none".into(),
977 ..AudioContextOptions::default()
978 };
979 let context = AudioContext::new(options);
980 let (sender, receiver) = std::sync::mpsc::channel();
981
982 context.run_diagnostics(move |diagnostics| {
983 sender.send(diagnostics).unwrap();
984 });
985
986 let diagnostics = receiver
987 .recv_timeout(std::time::Duration::from_secs(1))
988 .unwrap();
989
990 assert!(diagnostics.backend.name.contains("NoneBackend"));
991 assert_eq!(diagnostics.backend.sink_id, "none");
992 assert_eq!(diagnostics.backend.output_latency, Some(0.));
993 assert_eq!(diagnostics.render_thread.sample_rate, context.sample_rate());
994 assert_eq!(
995 diagnostics.render_thread.number_of_channels,
996 crate::MAX_CHANNELS
997 );
998 assert!(diagnostics.graph.active);
999 assert_eq!(diagnostics.graph.node_count, diagnostics.graph.nodes.len());
1000 assert_eq!(diagnostics.graph.edge_count, 0);
1001 assert!(diagnostics.graph.in_cycle.is_empty());
1002 assert!(diagnostics.graph.cycle_breakers.is_empty());
1003 assert!(diagnostics
1004 .graph
1005 .nodes
1006 .iter()
1007 .any(|node| node.id == DESTINATION_NODE_ID.0
1008 && node.inputs == node.input_channels.len()
1009 && node.outputs == node.output_channels.len()));
1010 }
1011}