dango_core/music_player/
music_player.rs1use std::sync::mpsc::{self, Sender, Receiver};
2use std::thread;
3use std::io::SeekFrom;
4
5use async_std::io::ReadExt;
6use async_std::task;
7
8use symphonia::core::codecs::{CODEC_TYPE_NULL, DecoderOptions, Decoder};
9use symphonia::core::formats::{FormatOptions, FormatReader, SeekMode, SeekTo};
10use symphonia::core::io::{MediaSourceStream, MediaSource};
11use symphonia::core::meta::MetadataOptions;
12use symphonia::core::probe::Hint;
13use symphonia::core::errors::Error;
14use symphonia::core::units::Time;
15
16use futures::AsyncBufRead;
17
18use crate::music_player::music_output::AudioStream;
19use crate::music_processor::music_processor::MusicProcessor;
20use crate::music_storage::music_db::URI;
21
22pub struct MusicPlayer {
24 pub music_processor: MusicProcessor,
25 player_status: PlayerStatus,
26 message_sender: Option<Sender<PlayerMessage>>,
27 status_receiver: Option<Receiver<PlayerStatus>>,
28}
29
30#[derive(Clone, Copy)]
31pub enum PlayerStatus {
32 Playing,
33 Paused,
34 Stopped,
35 Error,
36}
37
38pub enum PlayerMessage {
39 Play,
40 Pause,
41 Stop,
42 SeekTo(u64),
43 DSP(DSPMessage)
44}
45
46pub enum DSPMessage {
47 UpdateProcessor(Box<MusicProcessor>)
48}
49
50impl MusicPlayer {
51 pub fn new() -> Self {
52 MusicPlayer {
53 music_processor: MusicProcessor::new(),
54 player_status: PlayerStatus::Stopped,
55 message_sender: None,
56 status_receiver: None,
57 }
58 }
59
60 pub fn open_song(&mut self, uri: &URI) {
62 let (message_sender, message_receiver) = mpsc::channel();
64 let (status_sender, status_receiver) = mpsc::channel();
65 self.message_sender = Some(message_sender);
66 self.status_receiver = Some(status_receiver);
67
68 let owned_uri = uri.clone();
69
70 thread::spawn(move || {
72 let (mut reader, mut decoder) = MusicPlayer::get_reader_and_dec(&owned_uri);
73
74 let mut seek_time: Option<u64> = None;
75
76 let mut audio_output: Option<Box<dyn AudioStream>> = None;
77
78 let mut music_processor = MusicProcessor::new();
79
80 'main_decode: loop {
81 let received_message = message_receiver.try_recv();
83 match received_message {
84 Ok(PlayerMessage::Pause) => {
85 status_sender.send(PlayerStatus::Paused).unwrap();
86 'inner_pause: loop {
88 let message = message_receiver.try_recv();
89 match message {
90 Ok(PlayerMessage::Play) => {
91 status_sender.send(PlayerStatus::Playing).unwrap();
92 break 'inner_pause
93 },
94 Ok(PlayerMessage::Stop) => {
95 status_sender.send(PlayerStatus::Stopped).unwrap();
96 break 'main_decode
97 },
98 _ => {},
99 }
100 }
101 },
102 Ok(PlayerMessage::Stop) => {
104 status_sender.send(PlayerStatus::Stopped).unwrap();
105 break 'main_decode
106 },
107 Ok(PlayerMessage::SeekTo(time)) => seek_time = Some(time),
108 Ok(PlayerMessage::DSP(dsp_message)) => {
109 match dsp_message {
110 DSPMessage::UpdateProcessor(new_processor) => music_processor = *new_processor,
111 }
112 }
113 _ => {},
114 }
115
116 match seek_time {
117 Some(time) => {
118 let seek_to = SeekTo::Time { time: Time::from(time), track_id: Some(0) };
119 reader.seek(SeekMode::Accurate, seek_to).unwrap();
120 seek_time = None;
121 }
122 None => {} }
124
125 let packet = match reader.next_packet() {
126 Ok(packet) => packet,
127 Err(Error::ResetRequired) => panic!(), Err(err) => {
129 panic!("{}", err);
131 }
132 };
133
134 match decoder.decode(&packet) {
135 Ok(decoded) => {
136 if audio_output.is_none() {
138 let spec = *decoded.spec();
139 let duration = decoded.capacity() as u64;
140
141 audio_output.replace(crate::music_player::music_output::open_stream(spec, duration).unwrap());
142 }
143
144 if let Some(ref mut audio_output) = audio_output {
146 if music_processor.audio_buffer.capacity() != decoded.capacity() ||music_processor.audio_buffer.spec() != decoded.spec() {
148 let spec = *decoded.spec();
149 let duration = decoded.capacity() as u64;
150
151 music_processor.set_buffer(duration, spec);
152 }
153
154 let transformed_audio = music_processor.process(&decoded);
155
156 audio_output.write(transformed_audio).unwrap()
158 }
159 },
160 Err(Error::IoError(_)) => {
161 continue;
163 },
164 Err(Error::DecodeError(_)) => {
165 continue;
167 },
168 Err(err) => {
169 panic!("{}", err);
171 }
172 }
173 }
174 });
175 }
176
177 fn get_reader_and_dec(uri: &URI) -> (Box<dyn FormatReader>, Box<dyn Decoder>) {
178 let config = RemoteOptions { media_buffer_len: 10000, forward_buffer_len: 10000};
180 let src: Box<dyn MediaSource> = match uri {
181 URI::Local(path) => Box::new(std::fs::File::open(path).expect("Failed to open file")),
182 URI::Remote(_, location) => Box::new(RemoteSource::new(location.as_ref(), &config).unwrap()),
183 };
184
185 let mss = MediaSourceStream::new(src, Default::default());
186
187 let meta_opts: MetadataOptions = Default::default();
189 let fmt_opts: FormatOptions = Default::default();
190
191 let mut hint = Hint::new();
192
193 let probed = symphonia::default::get_probe().format(&hint, mss, &fmt_opts, &meta_opts).expect("Unsupported format");
194
195 let mut reader = probed.format;
196
197 let track = reader.tracks()
198 .iter()
199 .find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
200 .expect("no supported audio tracks");
201
202 let dec_opts: DecoderOptions = Default::default();
203
204 let mut decoder = symphonia::default::get_codecs().make(&track.codec_params, &dec_opts)
205 .expect("unsupported codec");
206
207 return (reader, decoder);
208 }
209
210 fn update_status(&mut self) {
212 let status = self.status_receiver.as_mut().unwrap().try_recv();
213 if status.is_ok() {
214 self.player_status = status.unwrap();
215 match status.unwrap() {
216 PlayerStatus::Stopped => {
218 self.status_receiver = None;
219 self.message_sender = None;
220 }
221 _ => {}
222 }
223 }
224 }
225
226 pub fn send_message(&mut self, message: PlayerMessage) {
228 self.update_status();
229 if self.message_sender.is_some() {
231 self.message_sender.as_mut().unwrap().send(message).unwrap();
232 }
233 }
234
235 pub fn get_status(&mut self) -> PlayerStatus {
236 self.update_status();
237 return self.player_status;
238 }
239}
240
241pub struct RemoteOptions {
248 media_buffer_len: u64,
249 forward_buffer_len: u64,
250}
251
252impl Default for RemoteOptions {
253 fn default() -> Self {
254 RemoteOptions {
255 media_buffer_len: 100000,
256 forward_buffer_len: 1024,
257 }
258 }
259}
260
261struct RemoteSource {
263 reader: Box<dyn AsyncBufRead + Send + Sync + Unpin>,
264 media_buffer: Vec<u8>,
265 forward_buffer_len: u64,
266 offset: u64,
267}
268
269impl RemoteSource {
270 pub fn new(uri: &str, config: &RemoteOptions) -> Result<Self, surf::Error> {
272 let mut response = task::block_on(async {
273 return surf::get(uri).await;
274 })?;
275
276 let reader = response.take_body().into_reader();
277
278 Ok(RemoteSource {
279 reader,
280 media_buffer: Vec::new(),
281 forward_buffer_len: config.forward_buffer_len,
282 offset: 0,
283 })
284 }
285}
286impl std::io::Read for RemoteSource {
288 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
289 if self.media_buffer.len() as u64 - self.offset < self.forward_buffer_len {
291 let mut buffer = [0; 1024];
292 let read_bytes = task::block_on(async {
293 match self.reader.read_exact(&mut buffer).await {
294 Ok(_) => {
295 self.media_buffer.extend_from_slice(&buffer);
296 return Ok(());
297 },
298 Err(err) => return Err(err),
299 }
300 });
301 match read_bytes {
302 Err(err) => return Err(err),
303 _ => {},
304 }
305 }
306 let mut bytes_read = 0;
308 for location in 0..1024 {
309 if (location + self.offset as usize) < self.media_buffer.len() {
310 buf[location] = self.media_buffer[location + self.offset as usize];
311 bytes_read += 1;
312 }
313 }
314
315 self.offset += bytes_read;
316 return Ok(bytes_read as usize);
317 }
318}
319
320impl std::io::Seek for RemoteSource {
321 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
324 match pos {
325 SeekFrom::Start(pos) => {
327 if pos > self.media_buffer.len() as u64{
328 self.offset = self.media_buffer.len() as u64;
329 } else {
330 self.offset = pos;
331 }
332 return Ok(self.offset);
333 },
334 SeekFrom::End(pos) => {
336 if self.media_buffer.len() as u64 + pos as u64 > self.media_buffer.len() as u64 {
337 self.offset = self.media_buffer.len() as u64;
338 } else {
339 self.offset = self.media_buffer.len() as u64 + pos as u64;
340 }
341 return Ok(self.offset);
342 },
343 SeekFrom::Current(pos) => {
345 if self.offset + pos as u64 > self.media_buffer.len() as u64{
346 self.offset = self.media_buffer.len() as u64;
347 } else {
348 self.offset += pos as u64
349 }
350 return Ok(self.offset);
351 },
352 }
353 }
354}
355
356impl MediaSource for RemoteSource {
357 fn is_seekable(&self) -> bool {
358 return true;
359 }
360
361 fn byte_len(&self) -> Option<u64> {
362 return None;
363 }
364}