use std::time::{Duration, Instant};
use bytes::Bytes;
use futures::future::FutureExt;
use tokio::sync::oneshot;
use tokio::{io::AsyncReadExt, process, sync::mpsc, task::unconstrained};
use crate::audio_dir::MediaFile;
use crate::ffmpeg::audio_from_url;
use crate::user::Command;
#[derive(Debug)]
pub struct Transport {
files: Vec<MediaFile>,
cursor: usize,
tx_transport_cmd: mpsc::UnboundedSender<TransportCommand>,
rx_transport_cmd: mpsc::UnboundedReceiver<TransportCommand>,
tx_audio: mpsc::Sender<Bytes>,
}
#[derive(Debug)]
pub enum TransportCommand {
NextTrack,
PrevTrack,
NowPlaying(oneshot::Sender<MediaFile>),
}
pub struct TransportHandle {
pub tx_transport_cmd: mpsc::UnboundedSender<TransportCommand>,
}
impl TransportHandle {
pub fn next_track(&self) -> Result<(), anyhow::Error> {
self.tx_transport_cmd.send(TransportCommand::NextTrack)?;
Ok(())
}
pub fn prev_track(&self) -> Result<(), anyhow::Error> {
self.tx_transport_cmd.send(TransportCommand::PrevTrack)?;
Ok(())
}
pub async fn now_playing(&self) -> Result<MediaFile, anyhow::Error> {
let (tx, rx) = oneshot::channel();
self.tx_transport_cmd
.send(TransportCommand::NowPlaying(tx))?;
Ok(rx.await?)
}
}
impl Transport {
pub fn new(tx_audio: mpsc::Sender<Bytes>) -> Self {
let (tx_transport_cmd, rx_transport_cmd) = mpsc::unbounded_channel();
Self {
files: Vec::new(),
cursor: 0,
tx_audio,
tx_transport_cmd,
rx_transport_cmd,
}
}
pub fn get_handle(&self) -> TransportHandle {
TransportHandle {
tx_transport_cmd: self.tx_transport_cmd.clone(),
}
}
pub fn extend(&mut self, files: Vec<MediaFile>) {
self.files.extend(files);
}
pub fn now_playing(&self) -> &MediaFile {
&self.files[self.cursor]
}
pub async fn run(mut self, tx_cmd: mpsc::UnboundedSender<Command>) {
let mut buf = [0u8; 4096];
'play: loop {
let mut audio_reader = match self.get_reader_at_cursor().await {
Ok(reader) => reader,
Err(_) => break 'play,
};
let path = std::path::PathBuf::from(&self.now_playing().location);
tx_cmd
.send(Command::Print(
path.file_name().unwrap().to_string_lossy().to_string(), ))
.unwrap();
const DOUBLE_TAP: Duration = Duration::from_millis(333);
let start_time = Instant::now();
while let Ok(len) = audio_reader.read(&mut buf).await {
if len == 0 {
self.cursor += 1;
continue 'play;
} else {
let chunk = Bytes::copy_from_slice(&buf[0..len]);
self.tx_audio.send(chunk).await.unwrap();
}
if let Some(Some(cmd)) = unconstrained(self.rx_transport_cmd.recv()).now_or_never()
{
match cmd {
TransportCommand::NextTrack => {
if self.cursor < self.files.len() - 1 {
self.cursor += 1;
}
continue 'play;
}
TransportCommand::PrevTrack => {
if start_time.elapsed() < DOUBLE_TAP {
self.cursor = self.cursor.saturating_sub(1);
}
continue 'play;
}
TransportCommand::NowPlaying(tx) => {
tx.send(self.now_playing().clone()).unwrap();
}
}
}
}
self.cursor += 1;
}
tx_cmd.send(Command::Quit).unwrap();
}
async fn get_reader_at_cursor(&mut self) -> Result<process::ChildStdout, anyhow::Error> {
if self.cursor >= self.files.len() {
return Err(anyhow::Error::msg("cursor index out of bounds, just quit"));
}
let file_url = &self.now_playing().location;
let mut file = audio_from_url(file_url).await.spawn()?;
file.stdout
.take()
.ok_or_else(|| anyhow::Error::msg("could not take music file's stdout"))
}
}