use std::collections::VecDeque;
use std::sync::{Arc, Mutex, mpsc};
use std::time::Duration;
use anyhow::Result;
use rodio::{Decoder, DeviceSinkBuilder, Player};
use crate::tap::SampleTap;
type SampleBuffer = Arc<Mutex<VecDeque<f32>>>;
#[derive(Debug)]
pub enum PlayerCommand {
PlayUrl { url: String, duration: Option<Duration>, gen: u64 },
EnqueueNext { url: String, duration: Option<Duration> },
Pause,
Resume,
Stop,
SetVolume(f32),
Seek(Duration),
Quit,
}
#[derive(Debug)]
pub enum PlayerEvent {
TrackStarted,
Progress { elapsed: Duration, total: Option<Duration> },
AboutToFinish,
TrackAdvanced,
TrackEnded,
Error(String),
}
pub fn spawn_player() -> (
mpsc::Sender<PlayerCommand>,
mpsc::Receiver<PlayerEvent>,
std::thread::JoinHandle<()>,
SampleBuffer,
) {
let (cmd_tx, cmd_rx) = mpsc::channel::<PlayerCommand>();
let (evt_tx, evt_rx) = mpsc::channel::<PlayerEvent>();
let sample_buffer: SampleBuffer = Arc::new(Mutex::new(VecDeque::with_capacity(4096)));
let thread_buffer = sample_buffer.clone();
let handle = std::thread::Builder::new()
.name("playterm-player".into())
.spawn(move || player_thread(cmd_rx, evt_tx, thread_buffer))
.expect("failed to spawn player thread");
(cmd_tx, evt_rx, handle, sample_buffer)
}
fn player_thread(cmd_rx: mpsc::Receiver<PlayerCommand>, evt_tx: mpsc::Sender<PlayerEvent>, sample_buffer: SampleBuffer) {
let mut device = match DeviceSinkBuilder::open_default_sink() {
Ok(d) => d,
Err(e) => {
let _ = evt_tx.send(PlayerEvent::Error(format!("audio device error: {e}")));
return;
}
};
device.log_on_drop(false);
let player = Player::connect_new(&device.mixer());
let mut current_total: Option<Duration> = None;
let mut was_playing = false;
let mut next_total: Option<Duration> = None;
let mut next_queued = false;
let mut about_to_finish_sent = false;
let mut prev_elapsed = Duration::ZERO;
let mut skip_gen: u64 = 0;
'outer: loop {
loop {
use mpsc::TryRecvError;
match cmd_rx.try_recv() {
Ok(PlayerCommand::Quit) => break 'outer,
Ok(PlayerCommand::PlayUrl { url, duration, gen }) => {
play_url(
url, duration, gen,
&cmd_rx, &mut skip_gen,
&player, &evt_tx,
&mut current_total, &mut was_playing,
&mut next_total, &mut next_queued,
&mut about_to_finish_sent, &mut prev_elapsed,
&sample_buffer,
);
}
Ok(cmd) => handle_command(
cmd,
&player,
&evt_tx,
&mut current_total,
&mut was_playing,
&mut next_total,
&mut next_queued,
&mut about_to_finish_sent,
&mut prev_elapsed,
&sample_buffer,
),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => break 'outer,
}
}
if !player.is_paused() && !player.empty() {
let elapsed = player.get_pos();
if next_queued
&& prev_elapsed > Duration::from_secs(2)
&& elapsed < Duration::from_secs(2)
{
current_total = next_total.take();
next_queued = false;
about_to_finish_sent = false;
let _ = evt_tx.send(PlayerEvent::TrackAdvanced);
}
prev_elapsed = elapsed;
let _ = evt_tx.send(PlayerEvent::Progress {
elapsed,
total: current_total,
});
if !about_to_finish_sent && !next_queued {
if let Some(total) = current_total {
let remaining = total.saturating_sub(elapsed);
if remaining <= Duration::from_secs(10) && remaining > Duration::ZERO {
about_to_finish_sent = true;
let _ = evt_tx.send(PlayerEvent::AboutToFinish);
}
}
}
was_playing = true;
}
if was_playing && player.empty() {
was_playing = false;
current_total = None;
next_total = None;
next_queued = false;
about_to_finish_sent = false;
prev_elapsed = Duration::ZERO;
let _ = evt_tx.send(PlayerEvent::TrackEnded);
}
std::thread::sleep(Duration::from_millis(500));
}
player.stop();
drop(player);
drop(device);
}
#[allow(clippy::too_many_arguments)]
fn play_url(
url: String,
duration: Option<Duration>,
gen: u64,
cmd_rx: &mpsc::Receiver<PlayerCommand>,
skip_gen: &mut u64,
player: &Player,
evt_tx: &mpsc::Sender<PlayerEvent>,
current_total: &mut Option<Duration>,
was_playing: &mut bool,
next_total: &mut Option<Duration>,
next_queued: &mut bool,
about_to_finish_sent: &mut bool,
prev_elapsed: &mut Duration,
sample_buffer: &SampleBuffer,
) {
*skip_gen = gen;
let mut final_url = url;
let mut final_duration = duration;
let mut final_gen = gen;
loop {
match cmd_rx.try_recv() {
Ok(PlayerCommand::PlayUrl { url: u, duration: d, gen: g }) => {
final_url = u;
final_duration = d;
final_gen = g;
*skip_gen = g;
}
Ok(_other) => break, Err(_) => break,
}
}
player.stop();
*was_playing = false;
*next_total = None;
*next_queued = false;
*about_to_finish_sent = false;
*prev_elapsed = Duration::ZERO;
let source = match download_and_decode(&final_url) {
Ok(s) => s,
Err(e) => {
let _ = evt_tx.send(PlayerEvent::Error(format!("playback error: {e}")));
return;
}
};
let mut newer: Option<(String, Option<Duration>, u64)> = None;
loop {
match cmd_rx.try_recv() {
Ok(PlayerCommand::PlayUrl { url: u, duration: d, gen: g }) => {
*skip_gen = g;
newer = Some((u, d, g));
}
Ok(_other) => break,
Err(_) => break,
}
}
if *skip_gen != final_gen {
drop(source);
if let Some((u, d, g)) = newer {
play_url(
u, d, g, cmd_rx, skip_gen,
player, evt_tx,
current_total, was_playing,
next_total, next_queued,
about_to_finish_sent, prev_elapsed,
sample_buffer,
);
}
return;
}
*current_total = final_duration;
let tapped = SampleTap::new(source, sample_buffer.clone());
player.append(tapped);
player.play();
let _ = evt_tx.send(PlayerEvent::TrackStarted);
}
fn handle_command(
cmd: PlayerCommand,
player: &Player,
evt_tx: &mpsc::Sender<PlayerEvent>,
current_total: &mut Option<Duration>,
was_playing: &mut bool,
next_total: &mut Option<Duration>,
next_queued: &mut bool,
about_to_finish_sent: &mut bool,
prev_elapsed: &mut Duration,
sample_buffer: &SampleBuffer,
) {
match cmd {
PlayerCommand::PlayUrl { .. } => {
unreachable!("PlayUrl must be dispatched via play_url()");
}
PlayerCommand::EnqueueNext { url, duration } => {
match download_and_decode(&url) {
Ok(source) => {
*next_total = duration;
*next_queued = true;
let tapped = SampleTap::new(source, sample_buffer.clone());
player.append(tapped);
}
Err(e) => {
let _ = evt_tx.send(PlayerEvent::Error(format!("enqueue error: {e}")));
}
}
}
PlayerCommand::Pause => player.pause(),
PlayerCommand::Resume => player.play(),
PlayerCommand::Stop => {
player.stop();
*current_total = None;
*next_total = None;
*next_queued = false;
*about_to_finish_sent = false;
*prev_elapsed = Duration::ZERO;
*was_playing = false;
}
PlayerCommand::SetVolume(v) => player.set_volume(v),
PlayerCommand::Seek(pos) => {
let _ = player.try_seek(pos);
*prev_elapsed = pos;
}
PlayerCommand::Quit => {
unreachable!("Quit must be handled in the outer command-drain loop");
}
}
}
fn download_and_decode(url: &str) -> Result<Decoder<std::io::Cursor<Vec<u8>>>> {
let bytes = reqwest::blocking::get(url)?.bytes()?;
let byte_len = bytes.len() as u64;
let cursor = std::io::Cursor::new(bytes.to_vec());
let decoder = Decoder::builder()
.with_data(cursor)
.with_byte_len(byte_len)
.with_coarse_seek(true)
.build()?;
Ok(decoder)
}