use std::sync::mpsc::{self, Sender, Receiver};
use std::thread;
use std::io::SeekFrom;
use async_std::io::ReadExt;
use async_std::task;
use symphonia::core::codecs::{CODEC_TYPE_NULL, DecoderOptions, Decoder};
use symphonia::core::formats::{FormatOptions, FormatReader, SeekMode, SeekTo};
use symphonia::core::io::{MediaSourceStream, MediaSource};
use symphonia::core::meta::MetadataOptions;
use symphonia::core::probe::Hint;
use symphonia::core::errors::Error;
use symphonia::core::units::Time;
use futures::AsyncBufRead;
use crate::music_player::music_output::AudioStream;
use crate::music_processor::music_processor::MusicProcessor;
use crate::music_storage::music_db::URI;
pub struct MusicPlayer {
pub music_processor: MusicProcessor,
player_status: PlayerStatus,
message_sender: Option<Sender<PlayerMessage>>,
status_receiver: Option<Receiver<PlayerStatus>>,
}
#[derive(Clone, Copy)]
pub enum PlayerStatus {
Playing,
Paused,
Stopped,
Error,
}
pub enum PlayerMessage {
Play,
Pause,
Stop,
SeekTo(u64),
DSP(DSPMessage)
}
pub enum DSPMessage {
UpdateProcessor(Box<MusicProcessor>)
}
impl MusicPlayer {
pub fn new() -> Self {
MusicPlayer {
music_processor: MusicProcessor::new(),
player_status: PlayerStatus::Stopped,
message_sender: None,
status_receiver: None,
}
}
pub fn open_song(&mut self, uri: &URI) {
let (message_sender, message_receiver) = mpsc::channel();
let (status_sender, status_receiver) = mpsc::channel();
self.message_sender = Some(message_sender);
self.status_receiver = Some(status_receiver);
let owned_uri = uri.clone();
thread::spawn(move || {
let (mut reader, mut decoder) = MusicPlayer::get_reader_and_dec(&owned_uri);
let mut seek_time: Option<u64> = None;
let mut audio_output: Option<Box<dyn AudioStream>> = None;
let mut music_processor = MusicProcessor::new();
'main_decode: loop {
let received_message = message_receiver.try_recv();
match received_message {
Ok(PlayerMessage::Pause) => {
status_sender.send(PlayerStatus::Paused).unwrap();
'inner_pause: loop {
let message = message_receiver.try_recv();
match message {
Ok(PlayerMessage::Play) => {
status_sender.send(PlayerStatus::Playing).unwrap();
break 'inner_pause
},
Ok(PlayerMessage::Stop) => {
status_sender.send(PlayerStatus::Stopped).unwrap();
break 'main_decode
},
_ => {},
}
}
},
Ok(PlayerMessage::Stop) => {
status_sender.send(PlayerStatus::Stopped).unwrap();
break 'main_decode
},
Ok(PlayerMessage::SeekTo(time)) => seek_time = Some(time),
Ok(PlayerMessage::DSP(dsp_message)) => {
match dsp_message {
DSPMessage::UpdateProcessor(new_processor) => music_processor = *new_processor,
}
}
_ => {},
}
match seek_time {
Some(time) => {
let seek_to = SeekTo::Time { time: Time::from(time), track_id: Some(0) };
reader.seek(SeekMode::Accurate, seek_to).unwrap();
seek_time = None;
}
None => {} }
let packet = match reader.next_packet() {
Ok(packet) => packet,
Err(Error::ResetRequired) => panic!(), Err(err) => {
panic!("{}", err);
}
};
match decoder.decode(&packet) {
Ok(decoded) => {
if audio_output.is_none() {
let spec = *decoded.spec();
let duration = decoded.capacity() as u64;
audio_output.replace(crate::music_player::music_output::open_stream(spec, duration).unwrap());
}
if let Some(ref mut audio_output) = audio_output {
if music_processor.audio_buffer.capacity() != decoded.capacity() ||music_processor.audio_buffer.spec() != decoded.spec() {
let spec = *decoded.spec();
let duration = decoded.capacity() as u64;
music_processor.set_buffer(duration, spec);
}
let transformed_audio = music_processor.process(&decoded);
audio_output.write(transformed_audio).unwrap()
}
},
Err(Error::IoError(_)) => {
continue;
},
Err(Error::DecodeError(_)) => {
continue;
},
Err(err) => {
panic!("{}", err);
}
}
}
});
}
fn get_reader_and_dec(uri: &URI) -> (Box<dyn FormatReader>, Box<dyn Decoder>) {
let config = RemoteOptions { media_buffer_len: 10000, forward_buffer_len: 10000};
let src: Box<dyn MediaSource> = match uri {
URI::Local(path) => Box::new(std::fs::File::open(path).expect("Failed to open file")),
URI::Remote(_, location) => Box::new(RemoteSource::new(location.as_ref(), &config).unwrap()),
};
let mss = MediaSourceStream::new(src, Default::default());
let meta_opts: MetadataOptions = Default::default();
let fmt_opts: FormatOptions = Default::default();
let mut hint = Hint::new();
let probed = symphonia::default::get_probe().format(&hint, mss, &fmt_opts, &meta_opts).expect("Unsupported format");
let mut reader = probed.format;
let track = reader.tracks()
.iter()
.find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
.expect("no supported audio tracks");
let dec_opts: DecoderOptions = Default::default();
let mut decoder = symphonia::default::get_codecs().make(&track.codec_params, &dec_opts)
.expect("unsupported codec");
return (reader, decoder);
}
fn update_status(&mut self) {
let status = self.status_receiver.as_mut().unwrap().try_recv();
if status.is_ok() {
self.player_status = status.unwrap();
match status.unwrap() {
PlayerStatus::Stopped => {
self.status_receiver = None;
self.message_sender = None;
}
_ => {}
}
}
}
pub fn send_message(&mut self, message: PlayerMessage) {
self.update_status();
if self.message_sender.is_some() {
self.message_sender.as_mut().unwrap().send(message).unwrap();
}
}
pub fn get_status(&mut self) -> PlayerStatus {
self.update_status();
return self.player_status;
}
}
pub struct RemoteOptions {
media_buffer_len: u64,
forward_buffer_len: u64,
}
impl Default for RemoteOptions {
fn default() -> Self {
RemoteOptions {
media_buffer_len: 100000,
forward_buffer_len: 1024,
}
}
}
struct RemoteSource {
reader: Box<dyn AsyncBufRead + Send + Sync + Unpin>,
media_buffer: Vec<u8>,
forward_buffer_len: u64,
offset: u64,
}
impl RemoteSource {
pub fn new(uri: &str, config: &RemoteOptions) -> Result<Self, surf::Error> {
let mut response = task::block_on(async {
return surf::get(uri).await;
})?;
let reader = response.take_body().into_reader();
Ok(RemoteSource {
reader,
media_buffer: Vec::new(),
forward_buffer_len: config.forward_buffer_len,
offset: 0,
})
}
}
impl std::io::Read for RemoteSource {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.media_buffer.len() as u64 - self.offset < self.forward_buffer_len {
let mut buffer = [0; 1024];
let read_bytes = task::block_on(async {
match self.reader.read_exact(&mut buffer).await {
Ok(_) => {
self.media_buffer.extend_from_slice(&buffer);
return Ok(());
},
Err(err) => return Err(err),
}
});
match read_bytes {
Err(err) => return Err(err),
_ => {},
}
}
let mut bytes_read = 0;
for location in 0..1024 {
if (location + self.offset as usize) < self.media_buffer.len() {
buf[location] = self.media_buffer[location + self.offset as usize];
bytes_read += 1;
}
}
self.offset += bytes_read;
return Ok(bytes_read as usize);
}
}
impl std::io::Seek for RemoteSource {
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
match pos {
SeekFrom::Start(pos) => {
if pos > self.media_buffer.len() as u64{
self.offset = self.media_buffer.len() as u64;
} else {
self.offset = pos;
}
return Ok(self.offset);
},
SeekFrom::End(pos) => {
if self.media_buffer.len() as u64 + pos as u64 > self.media_buffer.len() as u64 {
self.offset = self.media_buffer.len() as u64;
} else {
self.offset = self.media_buffer.len() as u64 + pos as u64;
}
return Ok(self.offset);
},
SeekFrom::Current(pos) => {
if self.offset + pos as u64 > self.media_buffer.len() as u64{
self.offset = self.media_buffer.len() as u64;
} else {
self.offset += pos as u64
}
return Ok(self.offset);
},
}
}
}
impl MediaSource for RemoteSource {
fn is_seekable(&self) -> bool {
return true;
}
fn byte_len(&self) -> Option<u64> {
return None;
}
}