use std::{
sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicU32, Ordering},
mpsc::{Receiver, Sender, TryRecvError, channel},
},
thread,
};
use lunar_lib::error;
use ringbuf::traits::Observer;
use selene_core::library::track::ResolvedTrack;
use crate::{
SHUTDOWN,
event_handler::EventTx,
player::{
AtomicPlaybackStatus, OpenedDecoder, PlaybackStatus, PlayerError, PlayerEvent,
PlayerRequest, PlayerTx,
playback::{CpalHandle, DeviceConfig},
},
wait,
};
pub struct DecoderHandle {
rx: Receiver<DecoderCommand>,
player_tx: Sender<PlayerRequest>,
event_tx: Sender<PlayerEvent>,
cpal_handle: Option<CpalHandle>,
device_config: Arc<Mutex<DeviceConfig>>,
current_decoder: Option<OpenedDecoder>,
preload_decoder: Option<OpenedDecoder>,
playback_state: Arc<AtomicPlaybackStatus>,
looping: Arc<AtomicBool>,
volume: Arc<AtomicU32>,
}
impl DecoderHandle {
pub(crate) fn open(
player_tx: Sender<PlayerRequest>,
event_tx: Sender<PlayerEvent>,
device_config: Arc<Mutex<DeviceConfig>>,
playback_state: Arc<AtomicPlaybackStatus>,
volume: Arc<AtomicU32>,
looping: Arc<AtomicBool>,
) -> Result<Sender<DecoderCommand>, PlayerError> {
let (tx, rx) = channel();
let handle = Self {
rx,
player_tx,
event_tx,
cpal_handle: None,
device_config,
current_decoder: None,
preload_decoder: None,
playback_state,
looping,
volume,
};
thread::spawn(move || {
if let Err(err) = handle.run() {
error!("DecoderHandle failed with error: {err}");
}
SHUTDOWN.store(true, Ordering::Relaxed);
});
Ok(tx)
}
fn run(mut self) -> Result<(), PlayerError> {
loop {
if SHUTDOWN.load(Ordering::Relaxed) {
return Ok(());
}
loop {
match self.rx.try_recv() {
Ok(command) => self.run_command(command)?,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return Ok(()),
}
}
if !matches!(
self.playback_state.load(Ordering::Relaxed),
PlaybackStatus::Playing
) {
wait();
continue;
}
if let Some(cpal_handle) = self.cpal_handle.as_mut()
&& let Some(finished_consuming) = cpal_handle.consume_packet()
&& !finished_consuming
{
wait();
continue;
}
let Some(mut decoder) = self.current_decoder.take() else {
if let Some(ref cpal_handle) = self.cpal_handle
&& !cpal_handle.audio_buf.is_empty()
{
wait();
continue;
}
self.cpal_handle = None;
self.playback_state
.store(PlaybackStatus::Stopped, Ordering::SeqCst);
self.event_tx.event(PlayerEvent::PlaybackStopped);
continue;
};
let mut cpal_handle = self.get_cpal_handle()?;
if decoder.at_eof {
if self.looping.load(Ordering::Relaxed) {
decoder.seek(0.0, false)?;
self.current_decoder = Some(decoder);
self.cpal_handle = Some(cpal_handle);
continue;
}
if let Some(preload) = self.preload_decoder.take() {
self.player_tx
.decoder_event(DecoderEvent::PreloadConsumed)?;
let currently_playing = preload.decoded_from.clone();
decoder = preload;
self.event_tx
.event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
} else {
self.cpal_handle = Some(cpal_handle);
continue;
}
}
if decoder.started() {
self.player_tx.decoder_event(DecoderEvent::NowPlaying {
track: decoder.decoded_from.clone(),
})?
};
let packet = decoder.decode_next_packet()?;
cpal_handle.pending_packet = packet;
let decoded_time = decoder.decoded_time();
if (decoded_time >= (decoder.decoded_from.track.container().stream().duration() / 2.0)
|| decoded_time >= 240.0)
&& !decoder.sent_scrobble
{
decoder.sent_scrobble = true;
self.player_tx.decoder_event(DecoderEvent::Scrobble {
track: decoder.decoded_from.clone(),
start_time: decoder.start_time().unwrap(),
})?
}
self.cpal_handle = Some(cpal_handle);
self.current_decoder = Some(decoder);
}
}
fn run_command(&mut self, command: DecoderCommand) -> Result<(), PlayerError> {
match command {
DecoderCommand::Load { decoder } => {
let currently_playing = decoder.decoded_from.clone();
self.current_decoder = Some(*decoder);
self.event_tx
.event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
}
DecoderCommand::Preload { decoder } => self.preload_decoder = Some(*decoder),
DecoderCommand::LoadAndPreload { load, preload } => {
let currently_playing = load.decoded_from.clone();
self.current_decoder = Some(*load);
self.preload_decoder = Some(*preload);
self.event_tx
.event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
}
DecoderCommand::SetPlaying(playing) => {
if let Some(ref decoder) = self.current_decoder {
let new_state = if playing {
PlaybackStatus::Playing
} else {
PlaybackStatus::Paused
};
if self.playback_state.load(Ordering::SeqCst) == new_state {
return Ok(());
}
self.playback_state.store(new_state, Ordering::SeqCst);
self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
is_playing: playing,
changed_at: decoder.time(),
});
}
}
DecoderCommand::TogglePlaying { callback } => {
let current_state = self.playback_state.load(Ordering::SeqCst);
if let Some(ref decoder) = self.current_decoder {
let new_state = match current_state {
PlaybackStatus::Playing => PlaybackStatus::Paused,
PlaybackStatus::Paused | PlaybackStatus::Stopped => PlaybackStatus::Playing,
};
self.playback_state.store(new_state, Ordering::SeqCst);
callback.send(new_state)?;
self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
is_playing: matches!(new_state, PlaybackStatus::Playing),
changed_at: decoder.time(),
});
} else {
callback.send(PlaybackStatus::Stopped)?;
}
}
DecoderCommand::Seek {
time,
increment,
callback,
} => {
if let Some(ref mut opened_decoder) = self.current_decoder {
let time = opened_decoder.seek(time, increment)?;
self.event_tx.event(PlayerEvent::SeekOccured { time });
callback.send(Some(time))?;
} else {
callback.send(None)?;
}
}
DecoderCommand::GetTime { callback } => {
if let Some(ref opened_decoder) = self.current_decoder {
callback.send(Some(opened_decoder.time()))?;
} else {
callback.send(None)?;
}
}
DecoderCommand::GetPlaying { callback } => {
if let Some(ref opened_decoder) = self.current_decoder {
callback.send(Some(opened_decoder.decoded_from.clone()))?;
} else {
callback.send(None)?;
}
}
DecoderCommand::Stop => {
self.current_decoder = None;
self.preload_decoder = None;
self.playback_state
.store(PlaybackStatus::Stopped, Ordering::SeqCst);
self.event_tx.event(PlayerEvent::PlaybackStopped);
}
DecoderCommand::Skip => {
if let Some(take) = self.preload_decoder.take() {
self.player_tx
.decoder_event(DecoderEvent::PreloadConsumed)?;
let currently_playing = take.decoded_from.clone();
self.current_decoder = Some(take);
self.event_tx
.event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
} else {
self.current_decoder = None;
}
}
}
Ok(())
}
}
impl DecoderHandle {
pub fn get_cpal_handle(&mut self) -> Result<CpalHandle, PlayerError> {
if let Some(current) = self.cpal_handle.take() {
Ok(current)
} else {
CpalHandle::open(&*self.device_config.lock()?, self.volume.clone())
}
}
}
pub enum DecoderEvent {
PreloadConsumed,
Scrobble {
track: Arc<ResolvedTrack>,
start_time: u64,
},
NowPlaying {
track: Arc<ResolvedTrack>,
},
}
pub enum DecoderCommand {
Load {
decoder: Box<OpenedDecoder>,
},
Preload {
decoder: Box<OpenedDecoder>,
},
LoadAndPreload {
load: Box<OpenedDecoder>,
preload: Box<OpenedDecoder>,
},
SetPlaying(bool),
TogglePlaying {
callback: Sender<PlaybackStatus>,
},
Seek {
time: f64,
increment: bool,
callback: Sender<Option<f64>>,
},
GetTime {
callback: Sender<Option<f64>>,
},
GetPlaying {
callback: Sender<Option<Arc<ResolvedTrack>>>,
},
Skip,
Stop,
}
pub trait DecoderTx {
fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
fn load_and_preload(
&self,
load: OpenedDecoder,
preload: OpenedDecoder,
) -> Result<(), PlayerError>;
fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError>;
fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError>;
fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError>;
fn get_time(&self) -> Result<Option<f64>, PlayerError>;
fn get_playing(&self) -> Result<Option<Arc<ResolvedTrack>>, PlayerError>;
fn stop(&self) -> Result<(), PlayerError>;
fn skip(&self) -> Result<(), PlayerError>;
}
impl DecoderTx for Sender<DecoderCommand> {
fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
self.send(DecoderCommand::Load {
decoder: Box::new(decoder),
})?;
Ok(())
}
fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
self.send(DecoderCommand::Preload {
decoder: Box::new(decoder),
})?;
Ok(())
}
fn load_and_preload(
&self,
load: OpenedDecoder,
preload: OpenedDecoder,
) -> Result<(), PlayerError> {
self.send(DecoderCommand::LoadAndPreload {
load: Box::new(load),
preload: Box::new(preload),
})?;
Ok(())
}
fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError> {
self.send(DecoderCommand::SetPlaying(is_playing))?;
Ok(())
}
fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError> {
let (tx, rx) = channel();
self.send(DecoderCommand::TogglePlaying { callback: tx })?;
Ok(rx.recv()?)
}
fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError> {
let (tx, rx) = channel();
self.send(DecoderCommand::Seek {
time,
increment,
callback: tx,
})?;
Ok(rx.recv()?)
}
fn get_time(&self) -> Result<Option<f64>, PlayerError> {
let (tx, rx) = channel();
self.send(DecoderCommand::GetTime { callback: tx })?;
Ok(rx.recv()?)
}
fn get_playing(&self) -> Result<Option<Arc<ResolvedTrack>>, PlayerError> {
let (tx, rx) = channel();
self.send(DecoderCommand::GetPlaying { callback: tx })?;
Ok(rx.recv()?)
}
fn stop(&self) -> Result<(), PlayerError> {
self.send(DecoderCommand::Stop)?;
Ok(())
}
fn skip(&self) -> Result<(), PlayerError> {
self.send(DecoderCommand::Skip)?;
Ok(())
}
}