use std::{
collections::HashMap,
fmt,
sync::{
atomic::{self, AtomicBool, AtomicUsize},
mpsc::{sync_channel, RecvTimeoutError, SyncSender},
Arc, Mutex,
},
thread,
time::Duration,
};
use basedrop::{Collector, Handle, Owned};
use crossbeam_queue::ArrayQueue;
use dashmap::DashMap;
use crate::{
effect::Effect,
error::Error,
output::OutputDevice,
source::{
amplified::AmplifiedSource,
converted::ConvertedSource,
file::FileSource,
guarded::GuardedSource,
mapped::ChannelMappedSource,
measured::{CpuLoad, MeasuredSource, SharedCpuLoadState},
metered::{AudioLevel, MeteredSource, SharedAudioLevelState},
mixed::{
EffectProcessor, MixedSource, MixerMessage, SubMixerProcessor, SubMixerThreadPool,
},
panned::PannedSource,
playback::PlaybackMessageQueue,
resampled::ResamplingQuality,
status::{PlaybackStatusContext, PlaybackStatusEvent},
synth::SynthSource,
Source,
},
Generator,
};
mod handles;
pub type PlaybackId = usize;
pub type MixerId = usize;
pub type EffectId = usize;
pub type NotePlaybackId = usize;
pub use handles::{
EffectHandle, FilePlaybackHandle, GeneratorPlaybackHandle, MixerHandle, SourcePlaybackHandle,
SynthPlaybackHandle,
};
pub type PanicHandler = crate::source::guarded::PanicHandler;
pub enum EffectMovement {
Direction(i32),
Start,
End,
}
struct PlayingSource {
is_playing: Arc<AtomicBool>,
is_transient: bool,
playback_message_queue: PlaybackMessageQueue,
mixer_id: MixerId,
source_name: String,
}
impl Drop for PlayingSource {
fn drop(&mut self) {
self.is_playing.store(false, atomic::Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
struct PlayerMixerInfo {
parent_id: MixerId,
event_queue: Arc<ArrayQueue<MixerMessage>>,
}
#[derive(Debug, Copy, Clone)]
struct PlayerEffectInfo {
mixer_id: MixerId,
effect_name: &'static str,
}
#[derive(Debug, Clone)]
pub struct PlayerConfig {
pub enforce_stereo_playback: bool,
pub concurrent_processing: bool,
pub concurrent_worker_threads: Option<usize>,
pub measuring_interval: Option<Duration>,
pub metering_interval: Option<Duration>,
}
impl Default for PlayerConfig {
fn default() -> Self {
Self::new()
}
}
impl PlayerConfig {
pub fn new() -> Self {
Self {
enforce_stereo_playback: true,
concurrent_processing: true,
concurrent_worker_threads: None,
measuring_interval: Some(Duration::from_millis(250)),
metering_interval: None,
}
}
pub fn enforce_stereo_playback(mut self, enabled: bool) -> Self {
self.enforce_stereo_playback = enabled;
self
}
pub fn concurrent_processing(mut self, enabled: bool) -> Self {
self.concurrent_processing = enabled;
self
}
pub fn concurrent_worker_threads(mut self, count: usize) -> Self {
self.concurrent_worker_threads = Some(count);
self
}
pub fn measuring_interval(mut self, interval: Option<Duration>) -> Self {
self.measuring_interval = interval;
self
}
pub fn metering_interval(mut self, interval: Option<Duration>) -> Self {
self.metering_interval = interval;
self
}
pub fn effective_concurrent_worker_threads(&self) -> usize {
self.concurrent_worker_threads.unwrap_or(num_cpus::get())
}
}
pub struct Player {
config: PlayerConfig,
output_device: Box<dyn OutputDevice>,
playing_sources: Arc<DashMap<PlaybackId, PlayingSource>>,
playback_status_running: Arc<AtomicBool>,
playback_status_sender: SyncSender<PlaybackStatusEvent>,
playback_status_thread: Option<thread::JoinHandle<()>>,
collector_handle: Handle,
collector_running: Arc<AtomicBool>,
collector_thread: Option<thread::JoinHandle<()>>,
mixers: DashMap<MixerId, PlayerMixerInfo>,
effects: DashMap<EffectId, PlayerEffectInfo>,
main_mixer_measurement_state: Option<SharedCpuLoadState>,
main_mixer_metering_state: Option<SharedAudioLevelState>,
main_mixer_panic_handler: Arc<Mutex<Option<PanicHandler>>>,
main_mixer_dropped: Arc<atomic::AtomicBool>,
}
impl Player {
const MAIN_MIXER_ID: MixerId = 0;
pub fn new<S: Into<Option<SyncSender<PlaybackStatusEvent>>>>(
output_device: impl OutputDevice + 'static,
playback_status_sender: S,
) -> Self {
Self::new_with_config(
output_device,
playback_status_sender,
PlayerConfig::default(),
)
}
pub fn new_with_config<S: Into<Option<SyncSender<PlaybackStatusEvent>>>>(
output_device: impl OutputDevice + 'static,
playback_status_sender: S,
config: PlayerConfig,
) -> Self {
log::info!("Creating a new player...");
let mut output_device = Box::new(output_device);
let playing_sources = Arc::new(DashMap::with_capacity(1024));
let playback_status_running = Arc::new(AtomicBool::new(true));
let playback_status_sender = playback_status_sender.into();
let (playback_status_sender, playback_status_thread) = Self::handle_playback_events(
playback_status_sender,
Arc::clone(&playing_sources),
Arc::clone(&playback_status_running),
);
let playback_status_thread = Some(playback_status_thread);
let collector = Collector::new();
let collector_handle = collector.handle();
let collector_running = Arc::new(AtomicBool::new(true));
let collector_thread = Some(Self::handle_drop_collects(
collector,
Arc::clone(&collector_running),
));
let mut main_mixer = MixedSource::new(
if config.enforce_stereo_playback {
2
} else {
output_device.channel_count()
},
output_device.sample_rate(),
);
let thread_pool = (config.concurrent_processing
&& config.effective_concurrent_worker_threads() > 1)
.then(|| {
log::info!(
"Creating mixer thread pool with {} threads...",
config.effective_concurrent_worker_threads()
);
SubMixerThreadPool::new(
config.effective_concurrent_worker_threads(),
output_device.sample_rate(),
)
});
main_mixer.set_thread_pool(thread_pool);
let mixer_event_queue = main_mixer.message_queue();
let metered_main_mixer = MeteredSource::new(main_mixer, config.metering_interval);
let main_mixer_metering_state = metered_main_mixer.state();
let measured_main_mixer =
MeasuredSource::new(metered_main_mixer, config.measuring_interval);
let main_mixer_measurement_state = measured_main_mixer.state();
let mixers = DashMap::new();
mixers.insert(
Player::MAIN_MIXER_ID,
PlayerMixerInfo {
parent_id: Player::MAIN_MIXER_ID,
event_queue: mixer_event_queue,
},
);
let effects = DashMap::new();
let main_mixer_panic_handler = Arc::new(Mutex::new(None));
let main_mixer_dropped = Arc::new(atomic::AtomicBool::new(false));
let guarded_main_mixer = GuardedSource::new(
measured_main_mixer,
"Player Main-Mixer",
Arc::clone(&main_mixer_panic_handler),
)
.with_drop_signal(Arc::clone(&main_mixer_dropped));
if config.enforce_stereo_playback && output_device.channel_count() != 2 {
let channel_mapped_source =
ChannelMappedSource::new(guarded_main_mixer, output_device.channel_count());
output_device.play(channel_mapped_source.into_box());
} else {
output_device.play(guarded_main_mixer.into_box());
}
Self {
config,
output_device,
playing_sources,
playback_status_running,
playback_status_sender,
playback_status_thread,
collector_handle,
collector_running,
collector_thread,
mixers,
effects,
main_mixer_dropped,
main_mixer_panic_handler,
main_mixer_measurement_state,
main_mixer_metering_state,
}
}
pub fn output_suspended(&self) -> bool {
self.output_device.is_suspended()
}
pub fn output_sample_rate(&self) -> u32 {
self.output_device.sample_rate()
}
pub fn output_channel_count(&self) -> usize {
if self.config.enforce_stereo_playback {
2
} else {
self.output_device.channel_count()
}
}
pub fn output_sample_frame_position(&self) -> u64 {
let channel_count = self.output_device.channel_count();
if channel_count > 0 {
self.output_device.sample_position() / channel_count as u64
} else {
0
}
}
pub fn output_volume(&self) -> f32 {
self.output_device.volume()
}
pub fn set_output_volume(&mut self, volume: f32) {
assert!(volume >= 0.0);
self.output_device.set_volume(volume);
}
pub fn cpu_load(&self) -> Option<CpuLoad> {
self.main_mixer_measurement_state
.as_ref()
.and_then(|s| s.try_lock().ok())
.map(|state| state.cpu_load())
}
pub fn cpu_load_state(&self) -> Option<SharedCpuLoadState> {
self.main_mixer_measurement_state.as_ref().map(Arc::clone)
}
pub fn audio_level(&self) -> Option<AudioLevel> {
self.main_mixer_metering_state
.as_ref()
.and_then(|s| s.try_lock().ok())
.map(|state| state.audio_level().clone())
}
pub fn audio_level_state(&self) -> Option<SharedAudioLevelState> {
self.main_mixer_metering_state.as_ref().map(Arc::clone)
}
pub fn set_panic_handler(&mut self, handler: Option<PanicHandler>) {
*self
.main_mixer_panic_handler
.lock()
.expect("Failed access panic handler lock") = handler;
}
pub fn is_running(&self) -> bool {
self.output_device.is_running()
}
pub fn start(&mut self) {
self.output_device.resume();
}
pub fn stop(&mut self) {
self.output_device.pause();
}
pub fn play_file_source<F: FileSource, T: Into<Option<u64>>>(
&mut self,
file_source: F,
start_time: T,
) -> Result<FilePlaybackHandle, Error> {
self.play_file_source_with_context(file_source, start_time, None)
}
pub fn play_file_source_with_context<F: FileSource, T: Into<Option<u64>>>(
&mut self,
file_source: F,
start_time: T,
context: Option<PlaybackStatusContext>,
) -> Result<FilePlaybackHandle, Error> {
let playback_options = *file_source.playback_options();
playback_options.validate()?;
let mixer_id = playback_options.target_mixer.unwrap_or(Self::MAIN_MIXER_ID);
let mixer_event_queue = self.mixer_event_queue(mixer_id)?;
let mut file_source = file_source;
file_source.set_playback_status_sender(Some(self.playback_status_sender.clone()));
file_source.set_playback_status_context(context);
let playback_id = file_source.playback_id();
let playback_message_queue = file_source.playback_message_queue();
let source_name = format!("File: '{}'", file_source.file_name());
let converted_source = ConvertedSource::new(
file_source,
self.output_channel_count(),
self.output_sample_rate(),
ResamplingQuality::Default,
);
let amplified_source = AmplifiedSource::new(converted_source, playback_options.volume);
let volume_message_queue = amplified_source.message_queue();
let panned_source = PannedSource::new(amplified_source, playback_options.panning);
let panning_message_queue = panned_source.message_queue();
let measure_interval = if playback_options.measure_cpu_load {
self.config.measuring_interval
} else {
None
};
let measured_source = MeasuredSource::new(panned_source, measure_interval);
let measurement_state = measured_source.state();
let is_playing = Arc::new(AtomicBool::new(true));
let playback_message_queue = PlaybackMessageQueue::File {
playback: playback_message_queue,
volume: volume_message_queue,
panning: panning_message_queue,
};
self.playing_sources.insert(
playback_id,
PlayingSource {
is_playing: Arc::clone(&is_playing),
is_transient: true,
playback_message_queue: playback_message_queue.clone(),
mixer_id,
source_name,
},
);
let source = Owned::new(&self.collector_handle, measured_source.into_box());
let sample_time = start_time.into().unwrap_or(0);
if mixer_event_queue
.push(MixerMessage::AddSource {
is_transient: true,
playback_id,
playback_message_queue: playback_message_queue.clone(),
source,
sample_time,
})
.is_err()
{
self.playing_sources.remove(&playback_id);
Err(Self::mixer_event_queue_error("play_file"))
} else {
Ok(FilePlaybackHandle::new(
is_playing,
playback_id,
playback_message_queue,
mixer_event_queue,
measurement_state,
))
}
}
pub fn play_synth_source<S: SynthSource, T: Into<Option<u64>>>(
&mut self,
synth_source: S,
start_time: T,
) -> Result<SynthPlaybackHandle, Error> {
self.play_synth_source_with_context(synth_source, start_time, None)
}
pub fn play_synth_source_with_context<S: SynthSource, T: Into<Option<u64>>>(
&mut self,
synth_source: S,
start_time: T,
context: Option<PlaybackStatusContext>,
) -> Result<SynthPlaybackHandle, Error> {
let playback_options = *synth_source.playback_options();
playback_options.validate()?;
let mixer_id = playback_options.target_mixer.unwrap_or(Self::MAIN_MIXER_ID);
let mixer_event_queue = self.mixer_event_queue(mixer_id)?;
let mut synth_source = synth_source;
synth_source.set_playback_status_sender(Some(self.playback_status_sender.clone()));
synth_source.set_playback_status_context(context);
let playback_id = synth_source.playback_id();
let playback_message_queue = synth_source.playback_message_queue();
let source_name = format!("Synth: '{}'", synth_source.synth_name());
let converted_source = ConvertedSource::new(
synth_source,
self.output_channel_count(),
self.output_sample_rate(),
ResamplingQuality::Default, );
let amplified_source = AmplifiedSource::new(converted_source, playback_options.volume);
let volume_message_queue = amplified_source.message_queue();
let panned_source = PannedSource::new(amplified_source, playback_options.panning);
let panning_message_queue = panned_source.message_queue();
let measure_interval = if playback_options.measure_cpu_load {
self.config.measuring_interval
} else {
None
};
let measured_source = MeasuredSource::new(panned_source, measure_interval);
let measurement_state = measured_source.state();
let is_playing = Arc::new(AtomicBool::new(true));
let playback_message_queue = PlaybackMessageQueue::Synth {
playback: playback_message_queue,
volume: volume_message_queue,
panning: panning_message_queue,
};
self.playing_sources.insert(
playback_id,
PlayingSource {
is_playing: Arc::clone(&is_playing),
is_transient: true,
playback_message_queue: playback_message_queue.clone(),
mixer_id,
source_name,
},
);
let source = Owned::new(&self.collector_handle, measured_source.into_box());
let sample_time = start_time.into().unwrap_or(0);
if mixer_event_queue
.push(MixerMessage::AddSource {
is_transient: true,
playback_id,
playback_message_queue: playback_message_queue.clone(),
source,
sample_time,
})
.is_err()
{
self.playing_sources.remove(&playback_id);
Err(Self::mixer_event_queue_error("play_synth"))
} else {
Ok(SynthPlaybackHandle::new(
is_playing,
playback_id,
playback_message_queue,
mixer_event_queue,
measurement_state,
))
}
}
pub fn play_generator<G: Generator + 'static, T: Into<Option<u64>>>(
&mut self,
generator: G,
start_time: T,
) -> Result<GeneratorPlaybackHandle, Error> {
let is_transient = true;
let mixer_id = generator
.playback_options()
.target_mixer
.unwrap_or(Self::MAIN_MIXER_ID);
self.add_or_play_generator(generator, is_transient, mixer_id, start_time)
}
pub fn add_generator<G: Generator + 'static, M: Into<Option<MixerId>>>(
&mut self,
generator: G,
mixer_id: M,
) -> Result<GeneratorPlaybackHandle, Error> {
let is_transient = false;
let mixer_id = mixer_id.into().unwrap_or(Self::MAIN_MIXER_ID);
if let Some(target_mixer_id) = generator.playback_options().target_mixer {
if target_mixer_id != mixer_id {
log::warn!("Ignoring target mixer id from playback options when adding instead of playing a generator");
}
}
self.add_or_play_generator(generator, is_transient, mixer_id, None)
}
pub fn remove_generator(&self, playback_id: PlaybackId) -> Result<(), Error> {
match self.playing_sources.get(&playback_id) {
Some(playing_source) => {
debug_assert!(
!playing_source.is_transient,
"Expected a non transient generator here, which was added via 'add_generator'"
);
if self
.mixer_event_queue(playing_source.mixer_id)?
.push(MixerMessage::RemoveSource { playback_id })
.is_err()
{
return Err(Self::mixer_event_queue_error("remove_generator"));
}
}
None => return Err(Error::GeneratorNotFoundError(playback_id)),
}
self.playing_sources.remove(&playback_id);
Ok(())
}
pub fn add_mixer<M: Into<Option<MixerId>>>(
&mut self,
parent_mixer_id: M,
) -> Result<MixerHandle, Error> {
let parent_mixer_id = parent_mixer_id.into().unwrap_or(Self::MAIN_MIXER_ID);
let parent_mixer_event_queue = self.mixer_event_queue(parent_mixer_id)?;
let mixer = MixedSource::new(self.output_channel_count(), self.output_sample_rate());
let mixer_queue = mixer.message_queue();
let mixer_id = Self::unique_mixer_id();
let metered_mixer = MeteredSource::new(mixer, self.config.metering_interval);
let metering_state = metered_mixer.state();
let measured_mixer = MeasuredSource::new(metered_mixer, self.config.measuring_interval);
let measurement_state = measured_mixer.state();
let mixer_processor = Owned::new(
&self.collector_handle,
SubMixerProcessor::new(Box::new(measured_mixer)),
);
if parent_mixer_event_queue
.push(MixerMessage::AddMixer {
mixer_id,
mixer_processor,
})
.is_err()
{
Err(Self::mixer_event_queue_error("add_mixer"))
} else {
self.mixers.insert(
mixer_id,
PlayerMixerInfo {
parent_id: parent_mixer_id,
event_queue: mixer_queue,
},
);
Ok(MixerHandle::new(
mixer_id,
measurement_state,
metering_state,
))
}
}
pub fn remove_mixer(&mut self, mixer_id: MixerId) -> Result<(), Error> {
if mixer_id == Self::MAIN_MIXER_ID {
return Err(Error::ParameterError(
"Cannot remove the main mixer".to_string(),
));
}
let parent_mixer_id = self.mixer_parent_id(mixer_id)?;
let parent_mixer_event_queue = self.mixer_event_queue(parent_mixer_id)?;
if parent_mixer_event_queue
.push(MixerMessage::RemoveMixer { mixer_id })
.is_err()
{
Err(Self::mixer_event_queue_error("remove_mixer"))
} else {
let effects_to_remove: Vec<EffectId> = self
.effects
.iter()
.filter_map(|entry| {
let (effect_id, effect_info) = (entry.key(), entry.value());
if effect_info.mixer_id == mixer_id {
Some(*effect_id)
} else {
None
}
})
.collect();
for effect_id in effects_to_remove {
self.effects.remove(&effect_id);
}
self.mixers.remove(&mixer_id);
Ok(())
}
}
pub fn remove_all_mixers<M: Into<Option<MixerId>>>(
&mut self,
mixer_id: M,
) -> Result<(), Error> {
let mixer_id = mixer_id.into().unwrap_or(Self::MAIN_MIXER_ID);
let sub_mixers_to_remove: Vec<MixerId> = self.sub_mixers_of(mixer_id);
for sub_mixer_id in sub_mixers_to_remove {
self.remove_mixer(sub_mixer_id)?;
}
Ok(())
}
pub fn add_effect<E: Effect, M: Into<Option<MixerId>>>(
&mut self,
effect: E,
mixer_id: M,
) -> Result<EffectHandle, Error> {
let mixer_id = mixer_id.into().unwrap_or(Self::MAIN_MIXER_ID);
let mixer_event_queue = self.mixer_event_queue(mixer_id)?;
let channel_count = self.output_channel_count();
let max_frames = MixedSource::MAX_MIX_BUFFER_SAMPLES / channel_count;
let mut effect = effect.into_box();
let effect_name = effect.name();
effect.initialize(self.output_sample_rate(), channel_count, max_frames)?;
let effect_processor = Owned::new(&self.collector_handle, EffectProcessor::new(effect));
let effect_id = Self::unique_effect_id();
if mixer_event_queue
.push(MixerMessage::AddEffect {
effect_id,
effect_processor,
})
.is_err()
{
Err(Self::mixer_event_queue_error("add_effect"))
} else {
self.effects.insert(
effect_id,
PlayerEffectInfo {
mixer_id,
effect_name,
},
);
Ok(EffectHandle::new(
effect_id,
mixer_id,
effect_name,
mixer_event_queue,
self.collector_handle.clone(),
))
}
}
pub fn move_effect<M: Into<Option<MixerId>>>(
&mut self,
movement: EffectMovement,
effect_id: EffectId,
mixer_id: M,
) -> Result<(), Error> {
let mixer_id = mixer_id.into().unwrap_or(Self::MAIN_MIXER_ID);
let effect_mixer_id = self.effect_parent_mixer_id(effect_id)?;
if effect_mixer_id != mixer_id {
return Err(Error::ParameterError(format!(
"Effect {} does not belong to mixer {}",
effect_id, mixer_id
)));
}
let mixer_event_queue = self.mixer_event_queue(mixer_id)?;
if mixer_event_queue
.push(MixerMessage::MoveEffect {
effect_id,
movement,
})
.is_err()
{
Err(Self::mixer_event_queue_error("move_effect"))
} else {
Ok(())
}
}
pub fn remove_effect(&mut self, effect_id: EffectId) -> Result<(), Error> {
if self
.effect_mixer_event_queue(effect_id)?
.push(MixerMessage::RemoveEffect { effect_id })
.is_err()
{
Err(Self::mixer_event_queue_error("remove_effect"))
} else {
self.effects.remove(&effect_id);
Ok(())
}
}
pub fn remove_all_effects<M: Into<Option<MixerId>>>(
&mut self,
mixer_id: M,
) -> Result<(), Error> {
let mixer_id = mixer_id.into().unwrap_or(Self::MAIN_MIXER_ID);
let effects_to_remove = self.effects_of(mixer_id);
for effect_id in effects_to_remove {
self.remove_effect(effect_id)?;
}
Ok(())
}
pub fn stop_all_sources(&mut self) -> Result<(), Error> {
let transient_source_ids: Vec<PlaybackId> = self
.playing_sources
.iter()
.filter_map(|entry| {
if entry.value().is_transient {
Some(*entry.key())
} else {
None
}
})
.collect();
for playback_id in transient_source_ids {
if let Some((_, source)) = self.playing_sources.remove(&playback_id) {
let _ = source.playback_message_queue.send_stop();
}
}
for entry in self.mixers.iter() {
if entry
.value()
.event_queue
.force_push(MixerMessage::RemoveAllPendingEvents)
.is_some()
{
log::warn!("Mixer's event queue is full.");
log::warn!("Increase the mixer event queue to prevent this from happening...");
}
}
Ok(())
}
fn add_or_play_generator<G: Generator + 'static, T: Into<Option<u64>>>(
&mut self,
generator: G,
is_transient: bool,
mixer_id: MixerId,
start_time: T,
) -> Result<GeneratorPlaybackHandle, Error> {
let playback_options = *generator.playback_options();
playback_options.validate()?;
let mixer_event_queue = self.mixer_event_queue(mixer_id)?;
let mut generator = generator;
generator.set_is_transient(is_transient);
generator.set_playback_status_sender(Some(self.playback_status_sender.clone()));
let playback_id = generator.playback_id();
let playback_message_queue = generator.playback_message_queue();
let source_name = format!("Generator '{}'", generator.generator_name());
let converted_source = ConvertedSource::new(
generator,
self.output_channel_count(),
self.output_sample_rate(),
ResamplingQuality::Default,
);
let amplified_source = AmplifiedSource::new(converted_source, playback_options.volume);
let volume_message_queue = amplified_source.message_queue();
let panned_source = PannedSource::new(amplified_source, playback_options.panning);
let panning_message_queue = panned_source.message_queue();
let measure_interval = if playback_options.measure_cpu_load {
self.config.measuring_interval
} else {
None
};
let measured_source = MeasuredSource::new(panned_source, measure_interval);
let measurement_state = measured_source.state();
let is_playing = Arc::new(AtomicBool::new(true));
let playback_message_queue = PlaybackMessageQueue::Generator {
playback: playback_message_queue,
volume: volume_message_queue,
panning: panning_message_queue,
};
self.playing_sources.insert(
playback_id,
PlayingSource {
is_playing: Arc::clone(&is_playing),
is_transient,
playback_message_queue: playback_message_queue.clone(),
mixer_id,
source_name,
},
);
let source = Owned::new(&self.collector_handle, measured_source.into_box());
let sample_time = start_time.into().unwrap_or(0);
if mixer_event_queue
.push(MixerMessage::AddSource {
is_transient,
playback_id,
playback_message_queue: playback_message_queue.clone(),
source,
sample_time,
})
.is_err()
{
self.playing_sources.remove(&playback_id);
Err(Self::mixer_event_queue_error("play_generator"))
} else {
Ok(GeneratorPlaybackHandle::new(
is_playing,
playback_id,
playback_message_queue,
mixer_event_queue,
self.collector_handle.clone(),
measurement_state,
))
}
}
fn handle_playback_events(
playback_sender: Option<SyncSender<PlaybackStatusEvent>>,
playing_sources: Arc<DashMap<PlaybackId, PlayingSource>>,
running: Arc<AtomicBool>,
) -> (SyncSender<PlaybackStatusEvent>, thread::JoinHandle<()>) {
const DEFAULT_PLAYBACK_EVENTS_CAPACITY: usize = 2048;
let (playback_sender_proxy, playback_receiver_proxy) =
sync_channel(DEFAULT_PLAYBACK_EVENTS_CAPACITY);
let handle = std::thread::Builder::new()
.name("audio_player_messages".to_string())
.spawn(move || {
while running.load(atomic::Ordering::Acquire) {
match playback_receiver_proxy.recv_timeout(Duration::from_millis(100)) {
Ok(event) => {
if let PlaybackStatusEvent::Stopped { id, .. } = event {
playing_sources.remove(&id);
}
if let Some(sender) = &playback_sender {
if let Err(err) = sender.send(event) {
log::warn!("Failed to send file status message: {err}");
}
}
}
Err(RecvTimeoutError::Timeout) => {
continue;
}
Err(RecvTimeoutError::Disconnected) => {
break;
}
};
}
log::info!("Playback event loop stopped");
})
.expect("Failed to spawn audio message thread");
(playback_sender_proxy, handle)
}
fn handle_drop_collects(
mut collector: Collector,
running: Arc<AtomicBool>,
) -> thread::JoinHandle<()> {
std::thread::Builder::new()
.name("audio_player_drops".to_string())
.spawn(move || {
while running.load(atomic::Ordering::Acquire) {
collector.collect();
thread::sleep(Duration::from_millis(100));
}
collector.collect();
if collector.try_cleanup().is_err() {
log::warn!("Failed to cleanup collector. Some handes will be leaked...");
}
log::info!("Audio collector loop stopped");
})
.expect("Failed to spawn audio message thread")
}
fn mixer_event_queue(&self, mixer_id: MixerId) -> Result<Arc<ArrayQueue<MixerMessage>>, Error> {
Ok(self
.mixers
.get(&mixer_id)
.ok_or(Error::MixerNotFoundError(mixer_id))?
.event_queue
.clone())
}
fn mixer_event_queue_error(event_name: &str) -> Error {
log::warn!("Mixer's event queue is full. Failed to send a {event_name} event.");
log::warn!("Increase the mixer event queue to prevent this from happening...");
Error::SendError("Mixer queue is full".to_string())
}
fn effect_mixer_event_queue(
&self,
effect_id: EffectId,
) -> Result<Arc<ArrayQueue<MixerMessage>>, Error> {
let effect_info = *self
.effects
.get(&effect_id)
.ok_or(Error::EffectNotFoundError(effect_id))?
.value();
self.mixer_event_queue(effect_info.mixer_id)
}
fn mixer_parent_id(&self, mixer_id: MixerId) -> Result<MixerId, Error> {
self.mixers
.get(&mixer_id)
.map(|entry| entry.value().parent_id)
.ok_or(Error::MixerNotFoundError(mixer_id))
}
fn sub_mixers_of(&self, mixer_id: MixerId) -> Vec<MixerId> {
self.mixers
.iter()
.filter_map(|entry| {
let (child_id, info) = (entry.key(), entry.value());
if info.parent_id == mixer_id && *child_id != Player::MAIN_MIXER_ID {
Some(*child_id)
} else {
None
}
})
.collect()
}
fn effect_parent_mixer_id(&self, effect_id: EffectId) -> Result<MixerId, Error> {
self.effects
.get(&effect_id)
.map(|entry| entry.value().mixer_id)
.ok_or(Error::EffectNotFoundError(effect_id))
}
fn effects_of(&self, mixer_id: MixerId) -> Vec<EffectId> {
self.effects
.iter()
.filter_map(|entry| {
let (effect_id, effect_info) = (entry.key(), entry.value());
if effect_info.mixer_id == mixer_id {
Some(*effect_id)
} else {
None
}
})
.collect::<Vec<_>>()
}
fn unique_id() -> usize {
static ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
ID_COUNTER.fetch_add(1, atomic::Ordering::Relaxed)
}
fn unique_mixer_id() -> MixerId {
Self::unique_id()
}
fn unique_effect_id() -> EffectId {
Self::unique_id()
}
}
impl Drop for Player {
fn drop(&mut self) {
log::info!("Releasing player's main mixer...");
self.output_device.stop();
let mut waited_ms = 0_usize;
while !self.main_mixer_dropped.load(atomic::Ordering::Acquire) {
thread::sleep(Duration::from_millis(100));
waited_ms += 100;
if waited_ms >= 5000 {
log::warn!("Timed out waiting for player's main mixer to drop");
break;
}
}
log::info!("Stopping player's playback status thread...");
self.playback_status_running
.store(false, atomic::Ordering::Release);
if let Some(handle) = self.playback_status_thread.take() {
let _ = handle.join();
}
log::info!("Stopping player's collector thread...");
self.collector_handle = Collector::new().handle();
self.collector_running
.store(false, atomic::Ordering::Release);
if let Some(handle) = self.collector_thread.take() {
let _ = handle.join();
}
log::info!("Closing outout device...");
self.output_device.close();
}
}
impl fmt::Display for Player {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.display_mixer(f, Self::MAIN_MIXER_ID, 0)
}
}
impl Player {
fn display_mixer(
&self,
f: &mut fmt::Formatter<'_>,
mixer_id: MixerId,
indent_level: usize,
) -> fmt::Result {
let indent = " ".repeat(indent_level);
let child_indent = " ".repeat(indent_level + 1);
if mixer_id == Self::MAIN_MIXER_ID {
writeln!(f, "{}- Main Mixer (ID: {})", indent, mixer_id)?;
} else {
writeln!(f, "{}- Sub-Mixer (ID: {})", indent, mixer_id)?;
}
let mut sub_mixers = self.sub_mixers_of(mixer_id);
sub_mixers.sort();
for sub_mixer_id in sub_mixers {
self.display_mixer(f, sub_mixer_id, indent_level + 1)?;
}
let sources_on_mixer: Vec<_> = self
.playing_sources
.iter()
.filter(|entry| entry.value().mixer_id == mixer_id)
.collect();
if !sources_on_mixer.is_empty() {
writeln!(f, "{}> Sources:", child_indent)?;
let item_indent = " ".repeat(indent_level + 2);
let mut grouped_sources: HashMap<String, Vec<PlaybackId>> = HashMap::new();
for source_entry in sources_on_mixer {
let source_id = *source_entry.key();
let source_info = source_entry.value();
grouped_sources
.entry(source_info.source_name.clone())
.or_default()
.push(source_id);
}
let mut sorted_grouped_sources: Vec<_> = grouped_sources.into_iter().collect();
sorted_grouped_sources.sort_by(|(name_a, _), (name_b, _)| name_a.cmp(name_b));
for (source_name, mut ids) in sorted_grouped_sources {
ids.sort();
let ids_str = ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(", ");
writeln!(f, "{}- {} (ID: {})", item_indent, source_name, ids_str)?;
}
}
let mut effects_on_mixer: Vec<_> = self
.effects
.iter()
.filter(|entry| entry.value().mixer_id == mixer_id)
.collect();
effects_on_mixer.sort_by_key(|e| *e.key());
if !effects_on_mixer.is_empty() {
writeln!(f, "{}^ Effects:", child_indent)?;
let item_indent = " ".repeat(indent_level + 2);
for effect_entry in effects_on_mixer {
let effect_id = effect_entry.key();
let effect_info = effect_entry.value();
writeln!(
f,
"{}- {} (ID: {})",
item_indent, effect_info.effect_name, effect_id
)?;
}
}
Ok(())
}
}