use std::error::Error;
use std::sync::Mutex;
use crate::context::{AudioContextState, BaseAudioContext, ConcreteBaseAudioContext};
use crate::events::{EventDispatch, EventHandler, EventLoop, EventPayload, EventType};
use crate::io::{self, AudioBackendManager, ControlThreadInit, NoneBackend, RenderThreadInit};
use crate::media_devices::{enumerate_devices_sync, MediaDeviceInfoKind};
use crate::media_streams::{MediaStream, MediaStreamTrack};
use crate::message::{ControlMessage, OneshotNotify};
use crate::node::{self, AudioNodeOptions};
use crate::render::graph::Graph;
use crate::MediaElement;
use crate::{AudioRenderCapacity, Event};
use futures_channel::oneshot;
fn is_valid_sink_id(sink_id: &str) -> bool {
if sink_id.is_empty() || sink_id == "none" {
true
} else {
enumerate_devices_sync()
.into_iter()
.filter(|d| d.kind() == MediaDeviceInfoKind::AudioOutput)
.any(|d| d.device_id() == sink_id)
}
}
#[derive(Copy, Clone, Debug)]
pub enum AudioContextLatencyCategory {
Balanced,
Interactive,
Playback,
Custom(f64),
}
impl Default for AudioContextLatencyCategory {
fn default() -> Self {
Self::Interactive
}
}
#[derive(Copy, Clone, Debug)]
#[non_exhaustive]
pub enum AudioContextRenderSizeCategory {
Default,
}
impl Default for AudioContextRenderSizeCategory {
fn default() -> Self {
Self::Default
}
}
#[derive(Clone, Debug, Default)]
pub struct AudioContextOptions {
pub latency_hint: AudioContextLatencyCategory,
pub sample_rate: Option<f32>,
pub sink_id: String,
pub render_size_hint: AudioContextRenderSizeCategory,
}
#[allow(clippy::module_name_repetitions)]
pub struct AudioContext {
base: ConcreteBaseAudioContext,
backend_manager: Mutex<Box<dyn AudioBackendManager>>,
render_capacity: AudioRenderCapacity,
render_thread_init: RenderThreadInit,
}
impl std::fmt::Debug for AudioContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AudioContext")
.field("sink_id", &self.sink_id())
.field("base_latency", &self.base_latency())
.field("output_latency", &self.output_latency())
.field("base", &self.base())
.finish_non_exhaustive()
}
}
impl Drop for AudioContext {
fn drop(&mut self) {
if self.state() == AudioContextState::Running {
let tombstone = Box::new(NoneBackend::void());
let original = std::mem::replace(self.backend_manager.get_mut().unwrap(), tombstone);
Box::leak(original);
}
}
}
impl BaseAudioContext for AudioContext {
fn base(&self) -> &ConcreteBaseAudioContext {
&self.base
}
}
impl Default for AudioContext {
fn default() -> Self {
Self::new(AudioContextOptions::default())
}
}
impl AudioContext {
#[must_use]
pub fn new(options: AudioContextOptions) -> Self {
assert!(
is_valid_sink_id(&options.sink_id),
"NotFoundError - Invalid sinkId: {:?}",
options.sink_id
);
let (control_thread_init, render_thread_init) = io::thread_init();
let backend = io::build_output(options, render_thread_init.clone());
let ControlThreadInit {
state,
frames_played,
ctrl_msg_send,
load_value_recv,
event_send,
event_recv,
} = control_thread_init;
let (node_id_producer, node_id_consumer) = llq::Queue::new().split();
let graph = Graph::new(node_id_producer);
let message = ControlMessage::Startup { graph };
ctrl_msg_send.send(message).unwrap();
let event_loop = EventLoop::new(event_recv);
let base = ConcreteBaseAudioContext::new(
backend.sample_rate(),
backend.number_of_channels(),
state,
frames_played,
ctrl_msg_send,
event_send,
event_loop.clone(),
false,
node_id_consumer,
);
let base_clone = base.clone();
let render_capacity = AudioRenderCapacity::new(base_clone, load_value_recv);
event_loop.run_in_thread();
Self {
base,
backend_manager: Mutex::new(backend),
render_capacity,
render_thread_init,
}
}
#[allow(clippy::unused_self)]
#[must_use]
pub fn base_latency(&self) -> f64 {
0.
}
#[must_use]
#[allow(clippy::missing_panics_doc)]
pub fn output_latency(&self) -> f64 {
self.backend_manager.lock().unwrap().output_latency()
}
#[allow(clippy::missing_panics_doc)]
pub fn sink_id(&self) -> String {
self.backend_manager.lock().unwrap().sink_id().to_owned()
}
#[must_use]
pub fn render_capacity(&self) -> AudioRenderCapacity {
self.render_capacity.clone()
}
#[allow(clippy::needless_collect, clippy::missing_panics_doc)]
pub fn set_sink_id_sync(&self, sink_id: String) -> Result<(), Box<dyn Error>> {
log::debug!("SinkChange requested");
if self.sink_id() == sink_id {
log::debug!("SinkChange: no-op");
return Ok(()); }
if !is_valid_sink_id(&sink_id) {
Err(format!("NotFoundError: invalid sinkId {sink_id}"))?;
};
log::debug!("SinkChange: locking backend manager");
let mut backend_manager_guard = self.backend_manager.lock().unwrap();
let original_state = self.state();
if original_state == AudioContextState::Closed {
log::debug!("SinkChange: context is closed");
return Ok(());
}
log::debug!("SinkChange: locking message channel");
let ctrl_msg_send = self.base.lock_control_msg_sender();
let mut pending_msgs: Vec<_> = self.render_thread_init.ctrl_msg_recv.try_iter().collect();
let graph = if matches!(pending_msgs.first(), Some(ControlMessage::Startup { .. })) {
log::debug!("SinkChange: recover unstarted graph");
let msg = pending_msgs.remove(0);
match msg {
ControlMessage::Startup { graph } => graph,
_ => unreachable!(),
}
} else {
log::debug!("SinkChange: recover graph from render thread");
let (graph_send, graph_recv) = crossbeam_channel::bounded(1);
let message = ControlMessage::CloseAndRecycle { sender: graph_send };
ctrl_msg_send.send(message).unwrap();
if original_state == AudioContextState::Suspended {
backend_manager_guard.resume();
}
graph_recv.recv().unwrap()
};
log::debug!("SinkChange: closing audio stream");
backend_manager_guard.close();
let options = AudioContextOptions {
sample_rate: Some(self.sample_rate()),
latency_hint: AudioContextLatencyCategory::default(), sink_id,
render_size_hint: AudioContextRenderSizeCategory::default(), };
log::debug!("SinkChange: starting audio stream");
*backend_manager_guard = io::build_output(options, self.render_thread_init.clone());
if original_state == AudioContextState::Suspended {
log::debug!("SinkChange: suspending audio stream");
backend_manager_guard.suspend();
}
let message = ControlMessage::Startup { graph };
ctrl_msg_send.send(message).unwrap();
pending_msgs
.into_iter()
.for_each(|m| self.base().send_control_msg(m));
drop(backend_manager_guard);
let _ = self.base.send_event(EventDispatch::sink_change());
log::debug!("SinkChange: done");
Ok(())
}
pub fn set_onsinkchange<F: FnMut(Event) + Send + 'static>(&self, mut callback: F) {
let callback = move |_| {
callback(Event {
type_: "sinkchange",
})
};
self.base().set_event_handler(
EventType::SinkChange,
EventHandler::Multiple(Box::new(callback)),
);
}
pub fn clear_onsinkchange(&self) {
self.base().clear_event_handler(EventType::SinkChange);
}
#[allow(clippy::missing_panics_doc)]
#[doc(hidden)] pub fn run_diagnostics<F: Fn(String) + Send + 'static>(&self, callback: F) {
let mut buffer = Vec::with_capacity(32 * 1024);
{
let backend = self.backend_manager.lock().unwrap();
use std::io::Write;
writeln!(&mut buffer, "backend: {}", backend.name()).ok();
writeln!(&mut buffer, "sink id: {}", backend.sink_id()).ok();
writeln!(
&mut buffer,
"output latency: {:.6}",
backend.output_latency()
)
.ok();
}
let callback = move |v| match v {
EventPayload::Diagnostics(v) => {
let s = String::from_utf8(v).unwrap();
callback(s);
}
_ => unreachable!(),
};
self.base().set_event_handler(
EventType::Diagnostics,
EventHandler::Once(Box::new(callback)),
);
self.base()
.send_control_msg(ControlMessage::RunDiagnostics { buffer });
}
pub async fn suspend(&self) {
log::debug!("Suspend called");
if self.state() != AudioContextState::Running {
log::debug!("Suspend no-op - context is not running");
return;
}
let (sender, receiver) = oneshot::channel();
let notify = OneshotNotify::Async(sender);
self.base
.send_control_msg(ControlMessage::Suspend { notify });
log::debug!("Suspending audio graph, waiting for signal..");
receiver.await.unwrap();
log::debug!("Suspended audio graph. Suspending audio stream..");
self.backend_manager.lock().unwrap().suspend();
log::debug!("Suspended audio stream");
}
pub async fn resume(&self) {
let (sender, receiver) = oneshot::channel();
{
log::debug!("Resume called, locking backend manager");
let backend_manager_guard = self.backend_manager.lock().unwrap();
if self.state() != AudioContextState::Suspended {
log::debug!("Resume no-op - context is not suspended");
return;
}
backend_manager_guard.resume();
log::debug!("Resumed audio stream, waking audio graph");
let notify = OneshotNotify::Async(sender);
self.base
.send_control_msg(ControlMessage::Resume { notify });
}
receiver.await.unwrap();
log::debug!("Resumed audio graph");
}
pub async fn close(&self) {
log::debug!("Close called");
if self.state() == AudioContextState::Closed {
log::debug!("Close no-op - context is already closed");
return;
}
if self.state() == AudioContextState::Running {
let (sender, receiver) = oneshot::channel();
let notify = OneshotNotify::Async(sender);
self.base.send_control_msg(ControlMessage::Close { notify });
log::debug!("Suspending audio graph, waiting for signal..");
receiver.await.unwrap();
} else {
self.base.set_state(AudioContextState::Closed);
}
log::debug!("Suspended audio graph. Closing audio stream..");
self.backend_manager.lock().unwrap().close();
self.render_capacity.stop();
log::debug!("Closed audio stream");
}
pub fn suspend_sync(&self) {
log::debug!("Suspend_sync called, locking backend manager");
let backend_manager_guard = self.backend_manager.lock().unwrap();
if self.state() != AudioContextState::Running {
log::debug!("Suspend_sync no-op - context is not running");
return;
}
let (sender, receiver) = crossbeam_channel::bounded(0);
let notify = OneshotNotify::Sync(sender);
self.base
.send_control_msg(ControlMessage::Suspend { notify });
log::debug!("Suspending audio graph, waiting for signal..");
receiver.recv().ok();
log::debug!("Suspended audio graph. Suspending audio stream..");
backend_manager_guard.suspend();
log::debug!("Suspended audio stream");
}
pub fn resume_sync(&self) {
log::debug!("Resume_sync called, locking backend manager");
let backend_manager_guard = self.backend_manager.lock().unwrap();
if self.state() != AudioContextState::Suspended {
log::debug!("Resume no-op - context is not suspended");
return;
}
backend_manager_guard.resume();
log::debug!("Resumed audio stream, waking audio graph");
let (sender, receiver) = crossbeam_channel::bounded(0);
let notify = OneshotNotify::Sync(sender);
self.base
.send_control_msg(ControlMessage::Resume { notify });
receiver.recv().ok();
log::debug!("Resumed audio graph");
}
pub fn close_sync(&self) {
log::debug!("Close_sync called, locking backend manager");
let backend_manager_guard = self.backend_manager.lock().unwrap();
if self.state() == AudioContextState::Closed {
log::debug!("Close no-op - context is already closed");
return;
}
if self.state() == AudioContextState::Running {
let (sender, receiver) = crossbeam_channel::bounded(0);
let notify = OneshotNotify::Sync(sender);
self.base.send_control_msg(ControlMessage::Close { notify });
log::debug!("Suspending audio graph, waiting for signal..");
receiver.recv().ok();
} else {
self.base.set_state(AudioContextState::Closed);
}
log::debug!("Suspended audio graph. Closing audio stream..");
backend_manager_guard.close();
self.render_capacity.stop();
log::debug!("Closed audio stream");
}
#[must_use]
pub fn create_media_stream_source(
&self,
media: &MediaStream,
) -> node::MediaStreamAudioSourceNode {
let opts = node::MediaStreamAudioSourceOptions {
media_stream: media,
};
node::MediaStreamAudioSourceNode::new(self, opts)
}
#[must_use]
pub fn create_media_stream_destination(&self) -> node::MediaStreamAudioDestinationNode {
let opts = AudioNodeOptions::default();
node::MediaStreamAudioDestinationNode::new(self, opts)
}
#[must_use]
pub fn create_media_stream_track_source(
&self,
media: &MediaStreamTrack,
) -> node::MediaStreamTrackAudioSourceNode {
let opts = node::MediaStreamTrackAudioSourceOptions {
media_stream_track: media,
};
node::MediaStreamTrackAudioSourceNode::new(self, opts)
}
#[must_use]
pub fn create_media_element_source(
&self,
media_element: &mut MediaElement,
) -> node::MediaElementAudioSourceNode {
let opts = node::MediaElementAudioSourceOptions { media_element };
node::MediaElementAudioSourceNode::new(self, opts)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor;
#[test]
fn test_suspend_resume_close() {
let options = AudioContextOptions {
sink_id: "none".into(),
..AudioContextOptions::default()
};
let context = AudioContext::new(options);
std::thread::sleep(std::time::Duration::from_millis(1));
executor::block_on(context.suspend());
assert_eq!(context.state(), AudioContextState::Suspended);
let time1 = context.current_time();
assert!(time1 >= 0.);
std::thread::sleep(std::time::Duration::from_millis(1));
let time2 = context.current_time();
assert_eq!(time1, time2);
executor::block_on(context.resume());
assert_eq!(context.state(), AudioContextState::Running);
std::thread::sleep(std::time::Duration::from_millis(1));
let time3 = context.current_time();
assert!(time3 > time2);
executor::block_on(context.close());
assert_eq!(context.state(), AudioContextState::Closed);
let time4 = context.current_time();
std::thread::sleep(std::time::Duration::from_millis(1));
let time5 = context.current_time();
assert_eq!(time5, time4); }
fn require_send_sync<T: Send + Sync>(_: T) {}
#[test]
fn test_all_futures_thread_safe() {
let options = AudioContextOptions {
sink_id: "none".into(),
..AudioContextOptions::default()
};
let context = AudioContext::new(options);
require_send_sync(context.suspend());
require_send_sync(context.resume());
require_send_sync(context.close());
}
#[test]
#[should_panic]
fn test_invalid_sink_id() {
let options = AudioContextOptions {
sink_id: "invalid".into(),
..AudioContextOptions::default()
};
let _ = AudioContext::new(options);
}
}