1use anyhow::Error;
2use futures_util::{future::IntoStream, StreamExt, TryFutureExt};
3use hyper::{
4 client::ResponseFuture,
5 header::{self, CONTENT_RANGE},
6 Body, Response, StatusCode,
7};
8use log::debug;
9use music_player_settings::get_application_directory;
10use std::{
11 cmp::min,
12 env, fs,
13 io::{self, Read, Seek, SeekFrom},
14 path::Path,
15 sync::{
16 atomic::{AtomicBool, AtomicUsize, Ordering},
17 Arc,
18 },
19 time::Duration,
20};
21use symphonia::core::io::MediaSource;
22use thiserror::Error;
23
24use parking_lot::{Condvar, Mutex};
25use tempfile::NamedTempFile;
26use tokio::sync::{mpsc, oneshot, Semaphore};
27use url::Url;
28
29use self::{client::Client, receive::audio_file_fetch};
30
31use crate::{
32 fetch::cache::Cache,
33 range_set::{Range, RangeSet},
34};
35
36pub mod client;
37
38pub mod receive;
39
40pub mod cache;
41
42pub type AudioFileResult = Result<(), anyhow::Error>;
43
44pub const MINIMUM_DOWNLOAD_SIZE: usize = 64 * 1024;
45
46pub const MINIMUM_THROUGHPUT: usize = 8 * 1024;
47
48pub const READ_AHEAD_BEFORE_PLAYBACK: Duration = Duration::from_secs(1);
49
50pub const READ_AHEAD_DURING_PLAYBACK: Duration = Duration::from_secs(5);
51
52pub const DOWNLOAD_TIMEOUT: Duration =
53 Duration::from_secs((MINIMUM_DOWNLOAD_SIZE / MINIMUM_THROUGHPUT) as u64);
54
55pub const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0;
56
57pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500);
60
61pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500);
63
64#[derive(Error, Debug)]
65pub enum AudioFileError {
66 #[error("other end of channel disconnected")]
67 Channel,
68 #[error("required header not found")]
69 Header,
70 #[error("streamer received no data")]
71 NoData,
72 #[error("no output available")]
73 Output,
74 #[error("invalid status code {0}")]
75 StatusCode(StatusCode),
76 #[error("wait timeout exceeded")]
77 WaitTimeout,
78}
79
80pub enum AudioFile {
81 Cached(fs::File),
82 Streaming(AudioFileStreaming),
83 Local(fs::File),
84}
85
86#[derive(Debug)]
87pub struct StreamingRequest {
88 streamer: IntoStream<ResponseFuture>,
89 initial_response: Option<Response<Body>>,
90 offset: usize,
91 length: usize,
92}
93
94#[derive(Clone)]
95pub struct StreamLoaderController {
96 channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
97 stream_shared: Option<Arc<AudioFileShared>>,
98 file_size: usize,
99}
100
101impl StreamLoaderController {
102 pub fn len(&self) -> usize {
103 self.file_size
104 }
105
106 pub fn is_empty(&self) -> bool {
107 self.file_size == 0
108 }
109
110 pub fn range_available(&self, range: Range) -> bool {
111 let available = if let Some(ref shared) = self.stream_shared {
112 let download_status = shared.download_status.lock();
113
114 range.length
115 <= download_status
116 .downloaded
117 .contained_length_from_value(range.start)
118 } else {
119 range.length <= self.len() - range.start
120 };
121
122 available
123 }
124
125 pub fn range_to_end_available(&self) -> bool {
126 match self.stream_shared {
127 Some(ref shared) => {
128 let read_position = shared.read_position();
129 self.range_available(Range::new(read_position, self.len() - read_position))
130 }
131 None => true,
132 }
133 }
134
135 pub fn ping_time(&self) -> Option<Duration> {
136 self.stream_shared.as_ref().map(|shared| shared.ping_time())
137 }
138
139 fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
140 if let Some(ref channel) = self.channel_tx {
141 let _ = channel.send(command);
144 }
145 }
146
147 pub fn fetch(&self, range: Range) {
148 self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
150 }
151
152 pub fn fetch_blocking(&self, mut range: Range) -> AudioFileResult {
153 if range.start >= self.len() {
157 range.length = 0;
158 } else if range.end() > self.len() {
159 range.length = self.len() - range.start;
160 }
161
162 self.fetch(range);
163
164 if let Some(ref shared) = self.stream_shared {
165 let mut download_status = shared.download_status.lock();
166
167 while range.length
168 > download_status
169 .downloaded
170 .contained_length_from_value(range.start)
171 {
172 if shared
173 .cond
174 .wait_for(&mut download_status, DOWNLOAD_TIMEOUT)
175 .timed_out()
176 {
177 return Err(AudioFileError::WaitTimeout.into());
178 }
179
180 if range.length
181 > (download_status
182 .downloaded
183 .union(&download_status.requested)
184 .contained_length_from_value(range.start))
185 {
186 self.fetch(range);
189 }
190 }
191 }
192
193 Ok(())
194 }
195
196 pub fn fetch_next_and_wait(
197 &self,
198 request_length: usize,
199 wait_length: usize,
200 ) -> AudioFileResult {
201 match self.stream_shared {
202 Some(ref shared) => {
203 let start = shared.read_position();
204
205 let request_range = Range {
206 start,
207 length: request_length,
208 };
209 self.fetch(request_range);
210
211 let wait_range = Range {
212 start,
213 length: wait_length,
214 };
215 self.fetch_blocking(wait_range)
216 }
217 None => Ok(()),
218 }
219 }
220
221 pub fn set_random_access_mode(&self) {
222 if let Some(ref shared) = self.stream_shared {
224 shared.set_download_streaming(false)
225 }
226 }
227
228 pub fn set_stream_mode(&self) {
229 if let Some(ref shared) = self.stream_shared {
231 shared.set_download_streaming(true)
232 }
233 }
234
235 pub fn close(&self) {
236 self.send_stream_loader_command(StreamLoaderCommand::Close);
238 }
239
240 pub fn mime_type(&self) -> Option<String> {
241 if let Some(ref shared) = self.stream_shared {
242 shared.get_mime_type()
243 } else {
244 None
245 }
246 }
247}
248
249pub struct AudioFileStreaming {
250 read_file: fs::File,
251 position: u64,
252 stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
253 shared: Arc<AudioFileShared>,
254}
255
256struct AudioFileDownloadStatus {
257 requested: RangeSet,
258 downloaded: RangeSet,
259}
260
261impl AudioFile {
262 pub async fn open(url: &str, bytes_per_second: usize) -> Result<AudioFile, Error> {
263 if Url::parse(url).is_err() {
264 return Ok(AudioFile::Local(fs::File::open(url)?));
265 }
266
267 let cache = Cache::new();
268 let file_id = format!("{:x}", md5::compute(url.to_owned()));
269 if cache.is_file_cached(file_id.as_str()) {
270 println!("File is cached: {}", file_id);
271 debug!(">> File is cached: {}", file_id);
272 return Ok(AudioFile::Cached(cache.open_file(file_id.as_str())?));
273 }
274
275 let (complete_tx, complete_rx) = oneshot::channel();
276
277 let streaming = AudioFileStreaming::open(url.to_owned(), complete_tx, bytes_per_second);
278
279 let file_id = format!("{:x}", md5::compute(url.to_owned()));
280
281 tokio::spawn(complete_rx.map_ok(move |mut file| {
283 println!("Download complete: {}", file.path().display());
284 debug!(">> Download complete: {}", file.path().display());
285 let cache = Cache::new();
286 match cache.save_file(&file_id, &mut file) {
287 Ok(_) => {
288 println!("Saved to cache: {}", file_id);
289 debug!(">> Saved to cache: {}", file_id);
290 }
291 Err(e) => {
292 println!("Failed to save to cache: {}", e);
293 debug!(">> Failed to save to cache: {}", e);
294 }
295 }
296 }));
297
298 Ok(AudioFile::Streaming(streaming.await?))
299 }
300
301 pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, Error> {
302 let controller = match self {
303 AudioFile::Streaming(ref stream) => StreamLoaderController {
304 channel_tx: Some(stream.stream_loader_command_tx.clone()),
305 stream_shared: Some(stream.shared.clone()),
306 file_size: stream.shared.file_size,
307 },
308 AudioFile::Cached(ref file) => StreamLoaderController {
309 channel_tx: None,
310 stream_shared: None,
311 file_size: file.metadata()?.len() as usize,
312 },
313 AudioFile::Local(ref file) => StreamLoaderController {
314 channel_tx: None,
315 stream_shared: None,
316 file_size: file.metadata()?.len() as usize,
317 },
318 };
319
320 Ok(controller)
321 }
322
323 pub fn is_cached(&self) -> bool {
324 matches!(self, AudioFile::Cached { .. })
325 }
326
327 pub fn is_local(&self) -> bool {
328 matches!(self, AudioFile::Local { .. })
329 }
330
331 pub async fn get_mime_type(url: &str) -> Result<String, Error> {
332 if Url::parse(url).is_err() {
333 if !Path::new(url).exists() {
334 return Err(Error::msg("File does not exist"));
335 }
336 match mime_guess::from_path(url).first() {
337 Some(mime) => return Ok(mime.to_string()),
338 None => return Err(Error::msg("No mime type found")),
339 }
340 }
341 let mut streamer = Client::new().stream_from_url(url, 0, 512)?;
342 let response = streamer.next().await.ok_or(AudioFileError::NoData)??;
343
344 let content_type = match response.headers().get(header::CONTENT_TYPE) {
345 Some(content_type) => content_type,
346 None => return Err(Error::msg("No Content-Type header")),
347 };
348
349 let mime = content_type.to_str()?;
350
351 Ok(mime.to_owned())
352 }
353}
354
355impl AudioFileStreaming {
356 pub async fn open(
357 url: String,
358 complete_tx: oneshot::Sender<NamedTempFile>,
359 bytes_per_second: usize,
360 ) -> Result<AudioFileStreaming, Error> {
361 debug!(">> Downloading file: {}", url);
367 let mut streamer = Client::new().stream_from_url(url.as_str(), 0, MINIMUM_DOWNLOAD_SIZE)?;
368
369 let response = streamer.next().await.ok_or(AudioFileError::NoData)??;
373
374 debug!(">> Got response: {:?}", response);
375
376 let code = response.status();
377 if code != StatusCode::PARTIAL_CONTENT {
378 return Err(AudioFileError::StatusCode(code).into());
379 }
380
381 let header_value = response
382 .headers()
383 .get(CONTENT_RANGE)
384 .ok_or(AudioFileError::Header)?;
385 let str_value = header_value.to_str()?;
386 let hyphen_index = str_value.find('-').unwrap_or_default();
387 let slash_index = str_value.find('/').unwrap_or_default();
388 let upper_bound: usize = str_value[hyphen_index + 1..slash_index].parse()?;
389 let file_size = str_value[slash_index + 1..].parse()?;
390
391 let content_type = match response.headers().get(header::CONTENT_TYPE) {
392 Some(content_type) => content_type,
393 None => return Err(Error::msg("No Content-Type header")),
394 };
395
396 let mime = content_type.to_str()?;
397 let mime = mime.to_owned();
398
399 let initial_request = StreamingRequest {
400 streamer,
401 initial_response: Some(response),
402 offset: 0,
403 length: upper_bound + 1,
404 };
405
406 let shared = Arc::new(AudioFileShared {
407 url,
408 file_size,
409 bytes_per_second,
410 cond: Condvar::new(),
411 download_status: Mutex::new(AudioFileDownloadStatus {
412 requested: RangeSet::new(),
413 downloaded: RangeSet::new(),
414 }),
415 download_streaming: AtomicBool::new(false),
416 download_slots: Semaphore::new(1),
417 ping_time_ms: AtomicUsize::new(0),
418 read_position: AtomicUsize::new(0),
419 throughput: AtomicUsize::new(0),
420 mime_type: mime,
421 });
422
423 let app_dir = get_application_directory();
424
425 debug!(">> Creating temp file in {:?}", app_dir);
426
427 let write_file = match env::consts::OS {
428 "android" => NamedTempFile::new_in(app_dir)?,
429 _ => NamedTempFile::new()?,
430 };
431 debug!(">> Created temp file: {:?}", write_file.path());
432 write_file.as_file().set_len(file_size as u64)?;
433
434 let read_file = write_file.reopen()?;
435
436 let (stream_loader_command_tx, stream_loader_command_rx) =
437 mpsc::unbounded_channel::<StreamLoaderCommand>();
438
439 tokio::spawn(audio_file_fetch(
440 shared.clone(),
441 initial_request,
442 write_file,
443 stream_loader_command_rx,
444 complete_tx,
445 ));
446
447 Ok(AudioFileStreaming {
448 read_file,
449 position: 0,
450 stream_loader_command_tx,
451 shared,
452 })
453 }
454}
455
456impl Read for AudioFileStreaming {
457 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
458 let offset = self.position as usize;
459
460 if offset >= self.shared.file_size {
461 return Ok(0);
462 }
463
464 let length = min(output.len(), self.shared.file_size - offset);
465 if length == 0 {
466 return Ok(0);
467 }
468
469 let length_to_request = if self.shared.is_download_streaming() {
470 let length_to_request = length
471 + (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32)
472 as usize;
473
474 min(length_to_request, self.shared.file_size - offset)
476 } else {
477 length
478 };
479
480 let mut ranges_to_request = RangeSet::new();
481 ranges_to_request.add_range(&Range::new(offset, length_to_request));
482
483 let mut download_status = self.shared.download_status.lock();
484
485 ranges_to_request.subtract_range_set(&download_status.downloaded);
486 ranges_to_request.subtract_range_set(&download_status.requested);
487
488 for &range in ranges_to_request.iter() {
489 self.stream_loader_command_tx
490 .send(StreamLoaderCommand::Fetch(range))
491 .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
492 }
493
494 while !download_status.downloaded.contains(offset) {
495 if self
496 .shared
497 .cond
498 .wait_for(&mut download_status, DOWNLOAD_TIMEOUT)
499 .timed_out()
500 {
501 return Err(io::Error::new(
502 io::ErrorKind::TimedOut,
503 Error::msg("Download timed out"),
504 ));
505 }
506 }
507 let available_length = download_status
508 .downloaded
509 .contained_length_from_value(offset);
510
511 drop(download_status);
512
513 self.position = self.read_file.seek(SeekFrom::Start(offset as u64))?;
514 let read_len = min(length, available_length);
515 let read_len = self.read_file.read(&mut output[..read_len])?;
516
517 self.position += read_len as u64;
518 self.shared.set_read_position(self.position);
519
520 Ok(read_len)
521 }
522}
523
524impl Seek for AudioFileStreaming {
525 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
526 let current_position = self.position as i64;
529 let requested_pos = match pos {
530 SeekFrom::Start(pos) => pos as i64,
531 SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1,
532 SeekFrom::Current(pos) => current_position + pos,
533 };
534 if requested_pos == current_position {
535 return Ok(current_position as u64);
536 }
537
538 let available = self
540 .shared
541 .download_status
542 .lock()
543 .downloaded
544 .contains(requested_pos as usize);
545
546 let mut was_streaming = false;
547 if !available {
548 was_streaming = self.shared.is_download_streaming();
552 if was_streaming {
553 self.shared.set_download_streaming(false);
554 }
555 }
556
557 self.position = self.read_file.seek(pos)?;
558 self.shared.set_read_position(self.position);
559
560 if !available && was_streaming {
561 self.shared.set_download_streaming(true);
562 }
563
564 Ok(self.position)
565 }
566}
567
568impl Read for AudioFile {
569 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
570 match *self {
571 AudioFile::Cached(ref mut file) => file.read(output),
572 AudioFile::Streaming(ref mut file) => file.read(output),
573 AudioFile::Local(ref mut file) => file.read(output),
574 }
575 }
576}
577
578impl Seek for AudioFile {
579 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
580 match *self {
581 AudioFile::Cached(ref mut file) => file.seek(pos),
582 AudioFile::Streaming(ref mut file) => file.seek(pos),
583 AudioFile::Local(ref mut file) => file.seek(pos),
584 }
585 }
586}
587
588#[derive(Debug)]
589pub enum StreamLoaderCommand {
590 Fetch(Range), Close, }
593
594struct AudioFileShared {
595 url: String,
596 file_size: usize,
597 bytes_per_second: usize,
598 cond: Condvar,
599 download_status: Mutex<AudioFileDownloadStatus>,
600 download_streaming: AtomicBool,
601 download_slots: Semaphore,
602 ping_time_ms: AtomicUsize,
603 read_position: AtomicUsize,
604 throughput: AtomicUsize,
605 mime_type: String,
606}
607
608impl AudioFileShared {
609 fn is_download_streaming(&self) -> bool {
610 self.download_streaming.load(Ordering::Acquire)
611 }
612
613 fn set_download_streaming(&self, streaming: bool) {
614 self.download_streaming.store(streaming, Ordering::Release)
615 }
616
617 fn ping_time(&self) -> Duration {
618 let ping_time_ms = self.ping_time_ms.load(Ordering::Acquire);
619 if ping_time_ms > 0 {
620 Duration::from_millis(ping_time_ms as u64)
621 } else {
622 INITIAL_PING_TIME_ESTIMATE
623 }
624 }
625
626 fn set_ping_time(&self, duration: Duration) {
627 self.ping_time_ms
628 .store(duration.as_millis() as usize, Ordering::Release)
629 }
630
631 fn throughput(&self) -> usize {
632 self.throughput.load(Ordering::Acquire)
633 }
634
635 fn set_throughput(&self, throughput: usize) {
636 self.throughput.store(throughput, Ordering::Release)
637 }
638
639 fn read_position(&self) -> usize {
640 self.read_position.load(Ordering::Acquire)
641 }
642
643 fn set_read_position(&self, position: u64) {
644 self.read_position
645 .store(position as usize, Ordering::Release)
646 }
647
648 fn get_mime_type(&self) -> Option<String> {
649 if Url::parse(&self.url).is_err() {
650 if Path::new(&self.url).exists() {
651 match mime_guess::from_path(&self.url).first() {
652 Some(mime) => {
653 return Some(mime.to_string());
654 }
655 None => return None,
656 };
657 }
658 }
659 Some(format!("{}", self.mime_type))
660 }
661}
662
663pub struct Subfile<T: Read + Seek> {
664 stream: T,
665 offset: u64,
666 length: u64,
667}
668
669impl<T: Read + Seek> Subfile<T> {
670 pub fn new(mut stream: T, offset: u64, length: u64) -> Result<Subfile<T>, io::Error> {
671 let target = SeekFrom::Start(offset);
672 stream.seek(target)?;
673
674 Ok(Subfile {
675 stream,
676 offset,
677 length,
678 })
679 }
680}
681
682impl<T: Read + Seek> Read for Subfile<T> {
683 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
684 self.stream.read(buf)
685 }
686}
687
688impl<T: Read + Seek> Seek for Subfile<T> {
689 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
690 let pos = match pos {
691 SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset),
692 SeekFrom::End(offset) => {
693 if (self.length as i64 - offset) < self.offset as i64 {
694 return Err(io::Error::new(
695 io::ErrorKind::InvalidInput,
696 "newpos would be < self.offset",
697 ));
698 }
699 pos
700 }
701 _ => pos,
702 };
703
704 let newpos = self.stream.seek(pos)?;
705 Ok(newpos - self.offset)
706 }
707}
708
709impl<R> MediaSource for Subfile<R>
710where
711 R: Read + Seek + Send + Sync,
712{
713 fn is_seekable(&self) -> bool {
714 true
715 }
716
717 fn byte_len(&self) -> Option<u64> {
718 Some(self.length)
719 }
720}