music_player_audio/fetch/
mod.rs

1use anyhow::Error;
2use futures_util::{future::IntoStream, StreamExt, TryFutureExt};
3use hyper::{
4    client::ResponseFuture,
5    header::{self, CONTENT_RANGE},
6    Body, Response, StatusCode,
7};
8use log::debug;
9use music_player_settings::get_application_directory;
10use std::{
11    cmp::min,
12    env, fs,
13    io::{self, Read, Seek, SeekFrom},
14    path::Path,
15    sync::{
16        atomic::{AtomicBool, AtomicUsize, Ordering},
17        Arc,
18    },
19    time::Duration,
20};
21use symphonia::core::io::MediaSource;
22use thiserror::Error;
23
24use parking_lot::{Condvar, Mutex};
25use tempfile::NamedTempFile;
26use tokio::sync::{mpsc, oneshot, Semaphore};
27use url::Url;
28
29use self::{client::Client, receive::audio_file_fetch};
30
31use crate::{
32    fetch::cache::Cache,
33    range_set::{Range, RangeSet},
34};
35
36pub mod client;
37
38pub mod receive;
39
40pub mod cache;
41
42pub type AudioFileResult = Result<(), anyhow::Error>;
43
44pub const MINIMUM_DOWNLOAD_SIZE: usize = 64 * 1024;
45
46pub const MINIMUM_THROUGHPUT: usize = 8 * 1024;
47
48pub const READ_AHEAD_BEFORE_PLAYBACK: Duration = Duration::from_secs(1);
49
50pub const READ_AHEAD_DURING_PLAYBACK: Duration = Duration::from_secs(5);
51
52pub const DOWNLOAD_TIMEOUT: Duration =
53    Duration::from_secs((MINIMUM_DOWNLOAD_SIZE / MINIMUM_THROUGHPUT) as u64);
54
55pub const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0;
56
57/// If the measured ping time to the Spotify server is larger than this value, it is capped
58/// to avoid run-away block sizes and pre-fetching.
59pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500);
60
61/// The ping time that is used for calculations before a ping time was actually measured.
62pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500);
63
64#[derive(Error, Debug)]
65pub enum AudioFileError {
66    #[error("other end of channel disconnected")]
67    Channel,
68    #[error("required header not found")]
69    Header,
70    #[error("streamer received no data")]
71    NoData,
72    #[error("no output available")]
73    Output,
74    #[error("invalid status code {0}")]
75    StatusCode(StatusCode),
76    #[error("wait timeout exceeded")]
77    WaitTimeout,
78}
79
80pub enum AudioFile {
81    Cached(fs::File),
82    Streaming(AudioFileStreaming),
83    Local(fs::File),
84}
85
86#[derive(Debug)]
87pub struct StreamingRequest {
88    streamer: IntoStream<ResponseFuture>,
89    initial_response: Option<Response<Body>>,
90    offset: usize,
91    length: usize,
92}
93
94#[derive(Clone)]
95pub struct StreamLoaderController {
96    channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
97    stream_shared: Option<Arc<AudioFileShared>>,
98    file_size: usize,
99}
100
101impl StreamLoaderController {
102    pub fn len(&self) -> usize {
103        self.file_size
104    }
105
106    pub fn is_empty(&self) -> bool {
107        self.file_size == 0
108    }
109
110    pub fn range_available(&self, range: Range) -> bool {
111        let available = if let Some(ref shared) = self.stream_shared {
112            let download_status = shared.download_status.lock();
113
114            range.length
115                <= download_status
116                    .downloaded
117                    .contained_length_from_value(range.start)
118        } else {
119            range.length <= self.len() - range.start
120        };
121
122        available
123    }
124
125    pub fn range_to_end_available(&self) -> bool {
126        match self.stream_shared {
127            Some(ref shared) => {
128                let read_position = shared.read_position();
129                self.range_available(Range::new(read_position, self.len() - read_position))
130            }
131            None => true,
132        }
133    }
134
135    pub fn ping_time(&self) -> Option<Duration> {
136        self.stream_shared.as_ref().map(|shared| shared.ping_time())
137    }
138
139    fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
140        if let Some(ref channel) = self.channel_tx {
141            // Ignore the error in case the channel has been closed already.
142            // This means that the file was completely downloaded.
143            let _ = channel.send(command);
144        }
145    }
146
147    pub fn fetch(&self, range: Range) {
148        // signal the stream loader to fetch a range of the file
149        self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
150    }
151
152    pub fn fetch_blocking(&self, mut range: Range) -> AudioFileResult {
153        // signal the stream loader to tech a range of the file and block until it is loaded.
154
155        // ensure the range is within the file's bounds.
156        if range.start >= self.len() {
157            range.length = 0;
158        } else if range.end() > self.len() {
159            range.length = self.len() - range.start;
160        }
161
162        self.fetch(range);
163
164        if let Some(ref shared) = self.stream_shared {
165            let mut download_status = shared.download_status.lock();
166
167            while range.length
168                > download_status
169                    .downloaded
170                    .contained_length_from_value(range.start)
171            {
172                if shared
173                    .cond
174                    .wait_for(&mut download_status, DOWNLOAD_TIMEOUT)
175                    .timed_out()
176                {
177                    return Err(AudioFileError::WaitTimeout.into());
178                }
179
180                if range.length
181                    > (download_status
182                        .downloaded
183                        .union(&download_status.requested)
184                        .contained_length_from_value(range.start))
185                {
186                    // For some reason, the requested range is neither downloaded nor requested.
187                    // This could be due to a network error. Request it again.
188                    self.fetch(range);
189                }
190            }
191        }
192
193        Ok(())
194    }
195
196    pub fn fetch_next_and_wait(
197        &self,
198        request_length: usize,
199        wait_length: usize,
200    ) -> AudioFileResult {
201        match self.stream_shared {
202            Some(ref shared) => {
203                let start = shared.read_position();
204
205                let request_range = Range {
206                    start,
207                    length: request_length,
208                };
209                self.fetch(request_range);
210
211                let wait_range = Range {
212                    start,
213                    length: wait_length,
214                };
215                self.fetch_blocking(wait_range)
216            }
217            None => Ok(()),
218        }
219    }
220
221    pub fn set_random_access_mode(&self) {
222        // optimise download strategy for random access
223        if let Some(ref shared) = self.stream_shared {
224            shared.set_download_streaming(false)
225        }
226    }
227
228    pub fn set_stream_mode(&self) {
229        // optimise download strategy for streaming
230        if let Some(ref shared) = self.stream_shared {
231            shared.set_download_streaming(true)
232        }
233    }
234
235    pub fn close(&self) {
236        // terminate stream loading and don't load any more data for this file.
237        self.send_stream_loader_command(StreamLoaderCommand::Close);
238    }
239
240    pub fn mime_type(&self) -> Option<String> {
241        if let Some(ref shared) = self.stream_shared {
242            shared.get_mime_type()
243        } else {
244            None
245        }
246    }
247}
248
249pub struct AudioFileStreaming {
250    read_file: fs::File,
251    position: u64,
252    stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
253    shared: Arc<AudioFileShared>,
254}
255
256struct AudioFileDownloadStatus {
257    requested: RangeSet,
258    downloaded: RangeSet,
259}
260
261impl AudioFile {
262    pub async fn open(url: &str, bytes_per_second: usize) -> Result<AudioFile, Error> {
263        if Url::parse(url).is_err() {
264            return Ok(AudioFile::Local(fs::File::open(url)?));
265        }
266
267        let cache = Cache::new();
268        let file_id = format!("{:x}", md5::compute(url.to_owned()));
269        if cache.is_file_cached(file_id.as_str()) {
270            println!("File is cached: {}", file_id);
271            debug!(">> File is cached: {}", file_id);
272            return Ok(AudioFile::Cached(cache.open_file(file_id.as_str())?));
273        }
274
275        let (complete_tx, complete_rx) = oneshot::channel();
276
277        let streaming = AudioFileStreaming::open(url.to_owned(), complete_tx, bytes_per_second);
278
279        let file_id = format!("{:x}", md5::compute(url.to_owned()));
280
281        // spawn a task to download the file
282        tokio::spawn(complete_rx.map_ok(move |mut file| {
283            println!("Download complete: {}", file.path().display());
284            debug!(">> Download complete: {}", file.path().display());
285            let cache = Cache::new();
286            match cache.save_file(&file_id, &mut file) {
287                Ok(_) => {
288                    println!("Saved to cache: {}", file_id);
289                    debug!(">> Saved to cache: {}", file_id);
290                }
291                Err(e) => {
292                    println!("Failed to save to cache: {}", e);
293                    debug!(">> Failed to save to cache: {}", e);
294                }
295            }
296        }));
297
298        Ok(AudioFile::Streaming(streaming.await?))
299    }
300
301    pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, Error> {
302        let controller = match self {
303            AudioFile::Streaming(ref stream) => StreamLoaderController {
304                channel_tx: Some(stream.stream_loader_command_tx.clone()),
305                stream_shared: Some(stream.shared.clone()),
306                file_size: stream.shared.file_size,
307            },
308            AudioFile::Cached(ref file) => StreamLoaderController {
309                channel_tx: None,
310                stream_shared: None,
311                file_size: file.metadata()?.len() as usize,
312            },
313            AudioFile::Local(ref file) => StreamLoaderController {
314                channel_tx: None,
315                stream_shared: None,
316                file_size: file.metadata()?.len() as usize,
317            },
318        };
319
320        Ok(controller)
321    }
322
323    pub fn is_cached(&self) -> bool {
324        matches!(self, AudioFile::Cached { .. })
325    }
326
327    pub fn is_local(&self) -> bool {
328        matches!(self, AudioFile::Local { .. })
329    }
330
331    pub async fn get_mime_type(url: &str) -> Result<String, Error> {
332        if Url::parse(url).is_err() {
333            if !Path::new(url).exists() {
334                return Err(Error::msg("File does not exist"));
335            }
336            match mime_guess::from_path(url).first() {
337                Some(mime) => return Ok(mime.to_string()),
338                None => return Err(Error::msg("No mime type found")),
339            }
340        }
341        let mut streamer = Client::new().stream_from_url(url, 0, 512)?;
342        let response = streamer.next().await.ok_or(AudioFileError::NoData)??;
343
344        let content_type = match response.headers().get(header::CONTENT_TYPE) {
345            Some(content_type) => content_type,
346            None => return Err(Error::msg("No Content-Type header")),
347        };
348
349        let mime = content_type.to_str()?;
350
351        Ok(mime.to_owned())
352    }
353}
354
355impl AudioFileStreaming {
356    pub async fn open(
357        url: String,
358        complete_tx: oneshot::Sender<NamedTempFile>,
359        bytes_per_second: usize,
360    ) -> Result<AudioFileStreaming, Error> {
361        // When the audio file is really small, this `download_size` may turn out to be
362        // larger than the audio file we're going to stream later on. This is OK; requesting
363        // `Content-Range` > `Content-Length` will return the complete file with status code
364        // 206 Partial Content.
365
366        debug!(">> Downloading file: {}", url);
367        let mut streamer = Client::new().stream_from_url(url.as_str(), 0, MINIMUM_DOWNLOAD_SIZE)?;
368
369        // Get the first chunk with the headers to get the file size.
370        // The remainder of that chunk with possibly also a response body is then
371        // further processed in `audio_file_fetch`.
372        let response = streamer.next().await.ok_or(AudioFileError::NoData)??;
373
374        debug!(">> Got response: {:?}", response);
375
376        let code = response.status();
377        if code != StatusCode::PARTIAL_CONTENT {
378            return Err(AudioFileError::StatusCode(code).into());
379        }
380
381        let header_value = response
382            .headers()
383            .get(CONTENT_RANGE)
384            .ok_or(AudioFileError::Header)?;
385        let str_value = header_value.to_str()?;
386        let hyphen_index = str_value.find('-').unwrap_or_default();
387        let slash_index = str_value.find('/').unwrap_or_default();
388        let upper_bound: usize = str_value[hyphen_index + 1..slash_index].parse()?;
389        let file_size = str_value[slash_index + 1..].parse()?;
390
391        let content_type = match response.headers().get(header::CONTENT_TYPE) {
392            Some(content_type) => content_type,
393            None => return Err(Error::msg("No Content-Type header")),
394        };
395
396        let mime = content_type.to_str()?;
397        let mime = mime.to_owned();
398
399        let initial_request = StreamingRequest {
400            streamer,
401            initial_response: Some(response),
402            offset: 0,
403            length: upper_bound + 1,
404        };
405
406        let shared = Arc::new(AudioFileShared {
407            url,
408            file_size,
409            bytes_per_second,
410            cond: Condvar::new(),
411            download_status: Mutex::new(AudioFileDownloadStatus {
412                requested: RangeSet::new(),
413                downloaded: RangeSet::new(),
414            }),
415            download_streaming: AtomicBool::new(false),
416            download_slots: Semaphore::new(1),
417            ping_time_ms: AtomicUsize::new(0),
418            read_position: AtomicUsize::new(0),
419            throughput: AtomicUsize::new(0),
420            mime_type: mime,
421        });
422
423        let app_dir = get_application_directory();
424
425        debug!(">> Creating temp file in {:?}", app_dir);
426
427        let write_file = match env::consts::OS {
428            "android" => NamedTempFile::new_in(app_dir)?,
429            _ => NamedTempFile::new()?,
430        };
431        debug!(">> Created temp file: {:?}", write_file.path());
432        write_file.as_file().set_len(file_size as u64)?;
433
434        let read_file = write_file.reopen()?;
435
436        let (stream_loader_command_tx, stream_loader_command_rx) =
437            mpsc::unbounded_channel::<StreamLoaderCommand>();
438
439        tokio::spawn(audio_file_fetch(
440            shared.clone(),
441            initial_request,
442            write_file,
443            stream_loader_command_rx,
444            complete_tx,
445        ));
446
447        Ok(AudioFileStreaming {
448            read_file,
449            position: 0,
450            stream_loader_command_tx,
451            shared,
452        })
453    }
454}
455
456impl Read for AudioFileStreaming {
457    fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
458        let offset = self.position as usize;
459
460        if offset >= self.shared.file_size {
461            return Ok(0);
462        }
463
464        let length = min(output.len(), self.shared.file_size - offset);
465        if length == 0 {
466            return Ok(0);
467        }
468
469        let length_to_request = if self.shared.is_download_streaming() {
470            let length_to_request = length
471                + (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32)
472                    as usize;
473
474            // Due to the read-ahead stuff, we potentially request more than the actual request demanded.
475            min(length_to_request, self.shared.file_size - offset)
476        } else {
477            length
478        };
479
480        let mut ranges_to_request = RangeSet::new();
481        ranges_to_request.add_range(&Range::new(offset, length_to_request));
482
483        let mut download_status = self.shared.download_status.lock();
484
485        ranges_to_request.subtract_range_set(&download_status.downloaded);
486        ranges_to_request.subtract_range_set(&download_status.requested);
487
488        for &range in ranges_to_request.iter() {
489            self.stream_loader_command_tx
490                .send(StreamLoaderCommand::Fetch(range))
491                .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
492        }
493
494        while !download_status.downloaded.contains(offset) {
495            if self
496                .shared
497                .cond
498                .wait_for(&mut download_status, DOWNLOAD_TIMEOUT)
499                .timed_out()
500            {
501                return Err(io::Error::new(
502                    io::ErrorKind::TimedOut,
503                    Error::msg("Download timed out"),
504                ));
505            }
506        }
507        let available_length = download_status
508            .downloaded
509            .contained_length_from_value(offset);
510
511        drop(download_status);
512
513        self.position = self.read_file.seek(SeekFrom::Start(offset as u64))?;
514        let read_len = min(length, available_length);
515        let read_len = self.read_file.read(&mut output[..read_len])?;
516
517        self.position += read_len as u64;
518        self.shared.set_read_position(self.position);
519
520        Ok(read_len)
521    }
522}
523
524impl Seek for AudioFileStreaming {
525    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
526        // If we are already at this position, we don't need to switch download mode.
527        // These checks and locks are less expensive than interrupting streaming.
528        let current_position = self.position as i64;
529        let requested_pos = match pos {
530            SeekFrom::Start(pos) => pos as i64,
531            SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1,
532            SeekFrom::Current(pos) => current_position + pos,
533        };
534        if requested_pos == current_position {
535            return Ok(current_position as u64);
536        }
537
538        // Again if we have already downloaded this part.
539        let available = self
540            .shared
541            .download_status
542            .lock()
543            .downloaded
544            .contains(requested_pos as usize);
545
546        let mut was_streaming = false;
547        if !available {
548            // Ensure random access mode if we need to download this part.
549            // Checking whether we are streaming now is a micro-optimization
550            // to save an atomic load.
551            was_streaming = self.shared.is_download_streaming();
552            if was_streaming {
553                self.shared.set_download_streaming(false);
554            }
555        }
556
557        self.position = self.read_file.seek(pos)?;
558        self.shared.set_read_position(self.position);
559
560        if !available && was_streaming {
561            self.shared.set_download_streaming(true);
562        }
563
564        Ok(self.position)
565    }
566}
567
568impl Read for AudioFile {
569    fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
570        match *self {
571            AudioFile::Cached(ref mut file) => file.read(output),
572            AudioFile::Streaming(ref mut file) => file.read(output),
573            AudioFile::Local(ref mut file) => file.read(output),
574        }
575    }
576}
577
578impl Seek for AudioFile {
579    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
580        match *self {
581            AudioFile::Cached(ref mut file) => file.seek(pos),
582            AudioFile::Streaming(ref mut file) => file.seek(pos),
583            AudioFile::Local(ref mut file) => file.seek(pos),
584        }
585    }
586}
587
588#[derive(Debug)]
589pub enum StreamLoaderCommand {
590    Fetch(Range), // signal the stream loader to fetch a range of the file
591    Close,        // terminate and don't load any more data
592}
593
594struct AudioFileShared {
595    url: String,
596    file_size: usize,
597    bytes_per_second: usize,
598    cond: Condvar,
599    download_status: Mutex<AudioFileDownloadStatus>,
600    download_streaming: AtomicBool,
601    download_slots: Semaphore,
602    ping_time_ms: AtomicUsize,
603    read_position: AtomicUsize,
604    throughput: AtomicUsize,
605    mime_type: String,
606}
607
608impl AudioFileShared {
609    fn is_download_streaming(&self) -> bool {
610        self.download_streaming.load(Ordering::Acquire)
611    }
612
613    fn set_download_streaming(&self, streaming: bool) {
614        self.download_streaming.store(streaming, Ordering::Release)
615    }
616
617    fn ping_time(&self) -> Duration {
618        let ping_time_ms = self.ping_time_ms.load(Ordering::Acquire);
619        if ping_time_ms > 0 {
620            Duration::from_millis(ping_time_ms as u64)
621        } else {
622            INITIAL_PING_TIME_ESTIMATE
623        }
624    }
625
626    fn set_ping_time(&self, duration: Duration) {
627        self.ping_time_ms
628            .store(duration.as_millis() as usize, Ordering::Release)
629    }
630
631    fn throughput(&self) -> usize {
632        self.throughput.load(Ordering::Acquire)
633    }
634
635    fn set_throughput(&self, throughput: usize) {
636        self.throughput.store(throughput, Ordering::Release)
637    }
638
639    fn read_position(&self) -> usize {
640        self.read_position.load(Ordering::Acquire)
641    }
642
643    fn set_read_position(&self, position: u64) {
644        self.read_position
645            .store(position as usize, Ordering::Release)
646    }
647
648    fn get_mime_type(&self) -> Option<String> {
649        if Url::parse(&self.url).is_err() {
650            if Path::new(&self.url).exists() {
651                match mime_guess::from_path(&self.url).first() {
652                    Some(mime) => {
653                        return Some(mime.to_string());
654                    }
655                    None => return None,
656                };
657            }
658        }
659        Some(format!("{}", self.mime_type))
660    }
661}
662
663pub struct Subfile<T: Read + Seek> {
664    stream: T,
665    offset: u64,
666    length: u64,
667}
668
669impl<T: Read + Seek> Subfile<T> {
670    pub fn new(mut stream: T, offset: u64, length: u64) -> Result<Subfile<T>, io::Error> {
671        let target = SeekFrom::Start(offset);
672        stream.seek(target)?;
673
674        Ok(Subfile {
675            stream,
676            offset,
677            length,
678        })
679    }
680}
681
682impl<T: Read + Seek> Read for Subfile<T> {
683    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
684        self.stream.read(buf)
685    }
686}
687
688impl<T: Read + Seek> Seek for Subfile<T> {
689    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
690        let pos = match pos {
691            SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset),
692            SeekFrom::End(offset) => {
693                if (self.length as i64 - offset) < self.offset as i64 {
694                    return Err(io::Error::new(
695                        io::ErrorKind::InvalidInput,
696                        "newpos would be < self.offset",
697                    ));
698                }
699                pos
700            }
701            _ => pos,
702        };
703
704        let newpos = self.stream.seek(pos)?;
705        Ok(newpos - self.offset)
706    }
707}
708
709impl<R> MediaSource for Subfile<R>
710where
711    R: Read + Seek + Send + Sync,
712{
713    fn is_seekable(&self) -> bool {
714        true
715    }
716
717    fn byte_len(&self) -> Option<u64> {
718        Some(self.length)
719    }
720}