dango_core/music_player/
music_player.rs

1use std::sync::mpsc::{self, Sender, Receiver};
2use std::thread;
3use std::io::SeekFrom;
4
5use async_std::io::ReadExt;
6use async_std::task;
7
8use symphonia::core::codecs::{CODEC_TYPE_NULL, DecoderOptions, Decoder};
9use symphonia::core::formats::{FormatOptions, FormatReader, SeekMode, SeekTo};
10use symphonia::core::io::{MediaSourceStream, MediaSource};
11use symphonia::core::meta::MetadataOptions;
12use symphonia::core::probe::Hint;
13use symphonia::core::errors::Error;
14use symphonia::core::units::Time;
15
16use futures::AsyncBufRead;
17
18use crate::music_player::music_output::AudioStream;
19use crate::music_processor::music_processor::MusicProcessor;
20use crate::music_storage::music_db::URI;
21
22// Struct that controls playback of music
23pub struct MusicPlayer {
24    pub music_processor: MusicProcessor,
25    player_status: PlayerStatus,
26    message_sender: Option<Sender<PlayerMessage>>,
27    status_receiver: Option<Receiver<PlayerStatus>>,
28}
29
30#[derive(Clone, Copy)]
31pub enum PlayerStatus {
32    Playing,
33    Paused,
34    Stopped,
35    Error,
36}
37
38pub enum PlayerMessage {
39    Play,
40    Pause,
41    Stop,
42    SeekTo(u64),
43    DSP(DSPMessage)
44}
45
46pub enum DSPMessage {
47    UpdateProcessor(Box<MusicProcessor>)
48}
49
50impl MusicPlayer {
51    pub fn new() -> Self {
52        MusicPlayer {
53            music_processor: MusicProcessor::new(),
54            player_status: PlayerStatus::Stopped,
55            message_sender: None,
56            status_receiver: None,
57        }
58    }
59    
60    // Opens and plays song with given path in separate thread
61    pub fn open_song(&mut self, uri: &URI) {
62        // Creates mpsc channels to communicate with thread
63        let (message_sender, message_receiver) = mpsc::channel();
64        let (status_sender, status_receiver) = mpsc::channel();
65        self.message_sender = Some(message_sender);
66        self.status_receiver = Some(status_receiver);
67        
68        let owned_uri = uri.clone();
69
70        // Creates thread that audio is decoded in
71        thread::spawn(move || {
72            let (mut reader, mut decoder) = MusicPlayer::get_reader_and_dec(&owned_uri);
73            
74            let mut seek_time: Option<u64> = None;
75            
76            let mut audio_output: Option<Box<dyn AudioStream>> = None;
77            
78            let mut music_processor = MusicProcessor::new();
79            
80            'main_decode: loop {    
81                // Handles message received from the MusicPlayer if there is one // TODO: Refactor
82                let received_message = message_receiver.try_recv();
83                match received_message {
84                    Ok(PlayerMessage::Pause) => { 
85                        status_sender.send(PlayerStatus::Paused).unwrap();
86                        // Loops on a blocking message receiver to wait for a play/stop message
87                        'inner_pause: loop {
88                            let message = message_receiver.try_recv();
89                            match message {
90                                Ok(PlayerMessage::Play) => {
91                                    status_sender.send(PlayerStatus::Playing).unwrap();
92                                    break 'inner_pause
93                                },
94                                Ok(PlayerMessage::Stop) => {
95                                    status_sender.send(PlayerStatus::Stopped).unwrap();
96                                    break 'main_decode
97                                },
98                                _ => {},
99                            }
100                        }
101                    },
102                    // Exits main decode loop and subsequently ends thread (?)
103                    Ok(PlayerMessage::Stop) => {
104                        status_sender.send(PlayerStatus::Stopped).unwrap();
105                        break 'main_decode
106                    },
107                    Ok(PlayerMessage::SeekTo(time)) => seek_time = Some(time),
108                    Ok(PlayerMessage::DSP(dsp_message)) => {
109                        match dsp_message {
110                            DSPMessage::UpdateProcessor(new_processor) => music_processor = *new_processor,
111                        }
112                    }
113                    _ => {},
114                } 
115                
116                match seek_time {
117                    Some(time) => {
118                        let seek_to = SeekTo::Time { time: Time::from(time), track_id: Some(0) };
119                        reader.seek(SeekMode::Accurate, seek_to).unwrap();
120                        seek_time = None;
121                    }
122                    None => {} //Nothing to do!
123                }
124                
125                let packet = match reader.next_packet() {
126                    Ok(packet) => packet,
127                    Err(Error::ResetRequired) => panic!(), //TODO,
128                    Err(err) => {
129                        //Unrecoverable?
130                        panic!("{}", err);
131                    }
132                };
133                
134                match decoder.decode(&packet) {
135                    Ok(decoded) => {
136                        // Opens audio stream if there is not one
137                        if audio_output.is_none() {
138                            let spec = *decoded.spec();
139                            let duration = decoded.capacity() as u64;
140                            
141                            audio_output.replace(crate::music_player::music_output::open_stream(spec, duration).unwrap());
142                        }
143                        
144                        // Handles audio normally provided there is an audio stream
145                        if let Some(ref mut audio_output) = audio_output {
146                            // Changes buffer of the MusicProcessor if the packet has a differing capacity or spec
147                            if music_processor.audio_buffer.capacity() != decoded.capacity() ||music_processor.audio_buffer.spec() != decoded.spec() {
148                                let spec = *decoded.spec();
149                                let duration = decoded.capacity() as u64;
150                                
151                                music_processor.set_buffer(duration, spec);
152                            }
153                            
154                            let transformed_audio = music_processor.process(&decoded);
155                            
156                            // Writes transformed packet to audio out
157                            audio_output.write(transformed_audio).unwrap()
158                        }
159                    },
160                    Err(Error::IoError(_)) => {
161                        // rest in peace packet
162                        continue;
163                    },
164                    Err(Error::DecodeError(_)) => {
165                        // may you one day be decoded
166                        continue;
167                    },
168                    Err(err) => {
169                        // Unrecoverable, though shouldn't panic here
170                        panic!("{}", err);
171                    }
172                }
173            }
174        });
175    }
176    
177    fn get_reader_and_dec(uri: &URI) -> (Box<dyn FormatReader>, Box<dyn Decoder>) {
178        // Opens remote/local source and creates MediaSource for symphonia
179        let config = RemoteOptions { media_buffer_len: 10000, forward_buffer_len: 10000};
180        let src: Box<dyn MediaSource> = match uri {
181            URI::Local(path) => Box::new(std::fs::File::open(path).expect("Failed to open file")),
182            URI::Remote(_, location) => Box::new(RemoteSource::new(location.as_ref(), &config).unwrap()),
183        };
184        
185        let mss = MediaSourceStream::new(src, Default::default());
186        
187        // Use default metadata and format options
188        let meta_opts: MetadataOptions = Default::default();
189        let fmt_opts: FormatOptions = Default::default();
190
191        let mut hint = Hint::new();
192        
193        let probed = symphonia::default::get_probe().format(&hint, mss, &fmt_opts, &meta_opts).expect("Unsupported format");
194        
195        let mut reader  = probed.format;
196        
197        let track = reader.tracks()
198                    .iter()
199                    .find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
200                    .expect("no supported audio tracks");
201                    
202        let dec_opts: DecoderOptions = Default::default();
203        
204        let mut decoder = symphonia::default::get_codecs().make(&track.codec_params, &dec_opts)
205                                                    .expect("unsupported codec");
206        
207        return (reader, decoder);
208    }
209    
210    // Updates status by checking on messages from spawned thread
211    fn update_status(&mut self) {
212        let status = self.status_receiver.as_mut().unwrap().try_recv();
213        if status.is_ok() {
214            self.player_status = status.unwrap();
215            match status.unwrap() {
216                // Removes receiver and sender since spawned thread no longer exists
217                PlayerStatus::Stopped => {
218                    self.status_receiver = None;
219                    self.message_sender = None;
220                }
221                _ => {}
222            }
223        }
224    }
225    
226    // Sends message to spawned thread
227    pub fn send_message(&mut self, message: PlayerMessage) {
228        self.update_status();
229        // Checks that message sender exists before sending a message off
230        if self.message_sender.is_some() {
231            self.message_sender.as_mut().unwrap().send(message).unwrap();
232        }
233    }
234    
235    pub fn get_status(&mut self) -> PlayerStatus {
236        self.update_status();
237        return self.player_status;
238    }
239}
240
241// TODO: Make the buffer length do anything
242/// Options for remote sources
243///
244/// media_buffer_len is how many bytes are to be buffered in totala
245///
246/// forward_buffer is how many bytes can ahead of the seek position without the remote source being read from
247pub struct RemoteOptions {
248    media_buffer_len: u64,
249    forward_buffer_len: u64,
250}
251
252impl Default for RemoteOptions {
253    fn default() -> Self {
254        RemoteOptions {
255            media_buffer_len: 100000,
256            forward_buffer_len: 1024,
257        }
258    }   
259}
260
261/// A remote source of media
262struct RemoteSource {
263    reader: Box<dyn AsyncBufRead + Send + Sync + Unpin>,
264    media_buffer: Vec<u8>,
265    forward_buffer_len: u64,
266    offset: u64,
267}
268
269impl RemoteSource {
270    /// Creates a new RemoteSource with given uri and configuration
271    pub fn new(uri: &str, config: &RemoteOptions) -> Result<Self, surf::Error> {
272        let mut response = task::block_on(async { 
273            return surf::get(uri).await;
274        })?;
275        
276        let reader = response.take_body().into_reader();
277        
278        Ok(RemoteSource {
279            reader,
280            media_buffer: Vec::new(),
281            forward_buffer_len: config.forward_buffer_len,
282            offset: 0,
283        })
284    }
285}
286// TODO: refactor this + buffer into the buffer passed into the function, not a newly allocated one
287impl std::io::Read for RemoteSource {
288    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
289        // Reads bytes into the media buffer if the offset is within the specified distance from the end of the buffer
290        if self.media_buffer.len() as u64 - self.offset < self.forward_buffer_len {
291            let mut buffer = [0; 1024];
292            let read_bytes = task::block_on(async {
293                match self.reader.read_exact(&mut buffer).await {
294                    Ok(_) => {
295                        self.media_buffer.extend_from_slice(&buffer);
296                        return Ok(());
297                    },
298                    Err(err) => return Err(err),
299                }
300            });
301            match read_bytes {
302                Err(err) => return Err(err),
303                _ => {},
304            }
305        }
306        // Reads bytes from the media buffer into the buffer given by 
307        let mut bytes_read = 0;
308        for location in 0..1024 {
309            if (location + self.offset as usize) < self.media_buffer.len() {
310                buf[location] = self.media_buffer[location + self.offset as usize];
311                bytes_read += 1;
312            }
313        }
314        
315        self.offset += bytes_read;
316        return Ok(bytes_read as usize);
317    }
318}
319
320impl std::io::Seek for RemoteSource {
321    // Seeks to a given position
322    // Seeking past the internal buffer's length results in the seeking to the end of content
323    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
324        match pos {
325            // Offset is set to given position
326            SeekFrom::Start(pos) => {
327                if pos > self.media_buffer.len() as u64{
328                    self.offset = self.media_buffer.len() as u64;
329                } else {
330                    self.offset = pos;
331                }
332                return Ok(self.offset);
333            },
334            // Offset is set to length of buffer + given position
335            SeekFrom::End(pos) => {
336                if self.media_buffer.len() as u64 + pos as u64 > self.media_buffer.len() as u64 {
337                    self.offset = self.media_buffer.len() as u64;
338                } else {
339                    self.offset = self.media_buffer.len() as u64 + pos as u64;
340                }
341                return Ok(self.offset);
342            },
343            // Offset is set to current offset + given position
344            SeekFrom::Current(pos) => {
345                if self.offset + pos as u64 > self.media_buffer.len() as u64{
346                    self.offset = self.media_buffer.len() as u64;
347                } else {
348                    self.offset += pos as u64
349                }
350                return Ok(self.offset);
351            },
352        }
353    }
354}
355
356impl MediaSource for RemoteSource {
357    fn is_seekable(&self) -> bool {
358        return true;
359    }
360    
361    fn byte_len(&self) -> Option<u64> {
362        return None;
363    }
364}