use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::{Duration, Instant, SystemTime},
};
use crossbeam::channel::{self, Receiver, Sender, TryRecvError};
use lunar_lib::{
log::error,
runners::{DispatchError, Executor, Runner},
};
use selene_core::symphonia_helpers::raw_decoder::{Decoder, SymphoniaError};
use crate::{
SHUTDOWN,
ipc_common::{AudioPacket, TrackInfo},
session::{DecoderMessageExt, SessionEventExt, SessionEventThread, SessionMainThread},
should_shutdown, wait,
};
pub(crate) struct DecoderHandle {
pub runner: Runner<DecoderInner>,
pub audio_buffer: Receiver<AudioPacket>,
}
pub(crate) struct DecoderInner {
exec: Executor<Self>,
session_runner: Runner<SessionMainThread>,
event_runner: Runner<SessionEventThread>,
decoder: Option<Decoder>,
start_time: Option<(u64, Instant)>,
sent_scrobble: bool,
packet_buffer: Sender<AudioPacket>,
is_playing: Arc<AtomicBool>,
looping: Arc<AtomicBool>,
}
impl DecoderHandle {
pub(crate) fn open(
playback_state: Arc<AtomicBool>,
looping: Arc<AtomicBool>,
session_runner: Runner<SessionMainThread>,
event_runner: Runner<SessionEventThread>,
) -> Self {
let (buf_prod, buf_cons) = channel::bounded(2);
let (runner, exec) = lunar_lib::runners::channel();
let inner = DecoderInner {
exec,
session_runner,
event_runner,
decoder: None,
start_time: None,
sent_scrobble: false,
packet_buffer: buf_prod,
is_playing: playback_state,
looping,
};
thread::spawn(move || {
if let Err(err) = inner.run() {
error!("DecoderHandle failed with error: {err}");
}
SHUTDOWN.store(true, Ordering::Relaxed);
});
Self {
runner,
audio_buffer: buf_cons,
}
}
}
impl DecoderInner {
fn run(mut self) -> anyhow::Result<()> {
while !should_shutdown() {
loop {
match self.exec.try_recv() {
Ok(command) => command(&mut self),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return Ok(()),
}
}
let decoder = if let Some(ref mut decoder) = self.decoder
&& self.is_playing.load(Ordering::Relaxed)
{
decoder
} else {
match self.exec.recv() {
Ok(command) => command(&mut self),
Err(_) => return Ok(()),
}
continue;
};
if self.packet_buffer.is_full() {
wait();
continue;
}
if let Some(packet) = decoder.decode_next_packet()? {
let mut frames = vec![0.0; packet.samples_interleaved()];
packet.copy_to_slice_interleaved(&mut frames);
let (start_unix, start_instant) = *self.start_time.get_or_insert_with(|| {
let time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
let instant = Instant::now();
self.packet_buffer
.send(AudioPacket::TrackInfo(TrackInfo::new(
decoder.stream.codec_params.sample_rate,
decoder.stream.codec_params.channels as u16,
None,
None,
)))
.expect("Session disconnected at the worst time");
self.event_runner
.now_playing()
.expect("Session disconnected at the worst time");
(time, instant)
});
let decoded_time = decoder.decoded_time();
if !self.sent_scrobble
&& (decoded_time >= (decoder.stream.duration() / 2.0) || decoded_time >= 240.0)
{
self.sent_scrobble = true;
self.event_runner.scrobble(start_unix)?;
}
self.packet_buffer.try_send(AudioPacket::Audio(frames))?;
let expected = decoder.current_frame as f64
/ f64::from(decoder.stream.codec_params.sample_rate);
let elapsed = start_instant.elapsed().as_secs_f64();
if expected > elapsed {
thread::sleep(Duration::from_secs_f64(expected - elapsed));
}
} else {
if self.looping.load(Ordering::Relaxed) {
decoder.seek(0.0, false)?;
self.start_time = None;
self.sent_scrobble = false;
decoder.decoded_frames = 0;
continue;
}
if let Some((preload, source)) = self.session_runner.request_preload()? {
*decoder = preload;
self.start_time = None;
self.sent_scrobble = false;
self.event_runner.currently_playing_changed(source)?;
} else {
while !self.packet_buffer.is_empty() {
wait();
continue;
}
self.is_playing.store(false, Ordering::SeqCst);
self.event_runner.playback_stopped()?;
}
}
}
Ok(())
}
}
lunar_lib::make_runner_command_ext!(DecoderInner => pub(crate) trait DecoderCommandExt {
fn load(&mut self, decoder: Decoder) -> () {
self.decoder = Some(decoder);
self.start_time = None;
self.sent_scrobble = false;
}
fn seek(&mut self, time: f64, increment: bool) -> Result<Option<f64>, SymphoniaError> {
if let Some(ref mut decoder) = self.decoder {
let time = decoder.seek(time, increment)?;
self.start_time = self.start_time.map(|(unix, _)| {
let new_instant = Instant::now()
.checked_sub(Duration::from_secs_f64(
decoder.current_frame as f64
/ f64::from(decoder.stream.codec_params.sample_rate),
))
.unwrap();
(unix, new_instant)
});
Ok(Some(time))
} else {
Ok(None)
}
}
fn get_time(&mut self) -> Option<f64> {
self.decoder.as_ref().map(Decoder::time)
}
fn stop(&mut self) -> Result<(), DispatchError> {
self.decoder = None;
self.is_playing.store(false, Ordering::SeqCst);
self.event_runner.playback_stopped()?;
Ok(())
}
fn set_playing(&mut self, is_playing: bool) -> Result<bool, DispatchError> {
if let Some(ref decoder) = self.decoder {
self.is_playing.store(is_playing, Ordering::Relaxed);
self.event_runner.playback_is_playing_changed(is_playing, decoder.time())?;
Ok(is_playing)
} else {
Ok(false)
}
}
fn toggle_playing(&mut self) -> Result<bool, DispatchError> {
if let Some(ref decoder) = self.decoder {
let state = !self.is_playing.fetch_not(Ordering::Relaxed);
self.event_runner.playback_is_playing_changed(state, decoder.time())?;
Ok(state)
} else {
self.event_runner.playback_is_playing_changed(false, 0.0)?;
Ok(false)
}
}
});