librespot_audio/fetch/
mod.rs

1mod receive;
2
3use std::{
4    cmp::min,
5    fs,
6    io::{self, Read, Seek, SeekFrom},
7    sync::{
8        atomic::{AtomicBool, AtomicUsize, Ordering},
9        Arc, OnceLock,
10    },
11    time::Duration,
12};
13
14use futures_util::{future::IntoStream, StreamExt, TryFutureExt};
15use hyper::{body::Incoming, header::CONTENT_RANGE, Response, StatusCode};
16use hyper_util::client::legacy::ResponseFuture;
17use parking_lot::{Condvar, Mutex};
18use tempfile::NamedTempFile;
19use thiserror::Error;
20use tokio::sync::{mpsc, oneshot, Semaphore};
21
22use librespot_core::{cdn_url::CdnUrl, Error, FileId, Session};
23
24use self::receive::audio_file_fetch;
25
26use crate::range_set::{Range, RangeSet};
27
28pub type AudioFileResult = Result<(), librespot_core::Error>;
29
30#[derive(Error, Debug)]
31pub enum AudioFileError {
32    #[error("other end of channel disconnected")]
33    Channel,
34    #[error("required header not found")]
35    Header,
36    #[error("streamer received no data")]
37    NoData,
38    #[error("no output available")]
39    Output,
40    #[error("invalid status code {0}")]
41    StatusCode(StatusCode),
42    #[error("wait timeout exceeded")]
43    WaitTimeout,
44}
45
46impl From<AudioFileError> for Error {
47    fn from(err: AudioFileError) -> Self {
48        match err {
49            AudioFileError::Channel => Error::aborted(err),
50            AudioFileError::Header => Error::unavailable(err),
51            AudioFileError::NoData => Error::unavailable(err),
52            AudioFileError::Output => Error::aborted(err),
53            AudioFileError::StatusCode(_) => Error::failed_precondition(err),
54            AudioFileError::WaitTimeout => Error::deadline_exceeded(err),
55        }
56    }
57}
58
59#[derive(Clone)]
60pub struct AudioFetchParams {
61    /// The minimum size of a block that is requested from the Spotify servers in one request.
62    /// This is the block size that is typically requested while doing a `seek()` on a file.
63    /// The Symphonia decoder requires this to be a power of 2 and > 32 kB.
64    /// Note: smaller requests can happen if part of the block is downloaded already.
65    pub minimum_download_size: usize,
66
67    /// The minimum network throughput that we expect. Together with the minimum download size,
68    /// this will determine the time we will wait for a response.
69    pub minimum_throughput: usize,
70
71    /// The ping time that is used for calculations before a ping time was actually measured.
72    pub initial_ping_time_estimate: Duration,
73
74    /// If the measured ping time to the Spotify server is larger than this value, it is capped
75    /// to avoid run-away block sizes and pre-fetching.
76    pub maximum_assumed_ping_time: Duration,
77
78    /// Before playback starts, this many seconds of data must be present.
79    /// Note: the calculations are done using the nominal bitrate of the file. The actual amount
80    /// of audio data may be larger or smaller.
81    pub read_ahead_before_playback: Duration,
82
83    /// While playing back, this many seconds of data ahead of the current read position are
84    /// requested.
85    /// Note: the calculations are done using the nominal bitrate of the file. The actual amount
86    /// of audio data may be larger or smaller.
87    pub read_ahead_during_playback: Duration,
88
89    /// If the amount of data that is pending (requested but not received) is less than a certain amount,
90    /// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
91    /// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>`
92    pub prefetch_threshold_factor: f32,
93
94    /// The time we will wait to obtain status updates on downloading.
95    pub download_timeout: Duration,
96}
97
98impl Default for AudioFetchParams {
99    fn default() -> Self {
100        let minimum_download_size = 64 * 1024;
101        let minimum_throughput = 8 * 1024;
102        Self {
103            minimum_download_size,
104            minimum_throughput,
105            initial_ping_time_estimate: Duration::from_millis(500),
106            maximum_assumed_ping_time: Duration::from_millis(1500),
107            read_ahead_before_playback: Duration::from_secs(1),
108            read_ahead_during_playback: Duration::from_secs(5),
109            prefetch_threshold_factor: 4.0,
110            download_timeout: Duration::from_secs(
111                (minimum_download_size / minimum_throughput) as u64,
112            ),
113        }
114    }
115}
116
117static AUDIO_FETCH_PARAMS: OnceLock<AudioFetchParams> = OnceLock::new();
118
119impl AudioFetchParams {
120    pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> {
121        AUDIO_FETCH_PARAMS.set(params)
122    }
123
124    pub fn get() -> &'static AudioFetchParams {
125        AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default)
126    }
127}
128
129pub enum AudioFile {
130    Cached(fs::File),
131    Streaming(AudioFileStreaming),
132}
133
134#[derive(Debug)]
135pub struct StreamingRequest {
136    streamer: IntoStream<ResponseFuture>,
137    initial_response: Option<Response<Incoming>>,
138    offset: usize,
139    length: usize,
140}
141
142#[derive(Debug)]
143pub enum StreamLoaderCommand {
144    Fetch(Range), // signal the stream loader to fetch a range of the file
145    Close,        // terminate and don't load any more data
146}
147
148#[derive(Clone)]
149pub struct StreamLoaderController {
150    channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
151    stream_shared: Option<Arc<AudioFileShared>>,
152    file_size: usize,
153}
154
155impl StreamLoaderController {
156    pub fn len(&self) -> usize {
157        self.file_size
158    }
159
160    pub fn is_empty(&self) -> bool {
161        self.file_size == 0
162    }
163
164    pub fn range_available(&self, range: Range) -> bool {
165        let available = if let Some(ref shared) = self.stream_shared {
166            let download_status = shared.download_status.lock();
167
168            range.length
169                <= download_status
170                    .downloaded
171                    .contained_length_from_value(range.start)
172        } else {
173            range.length <= self.len() - range.start
174        };
175
176        available
177    }
178
179    pub fn range_to_end_available(&self) -> bool {
180        match self.stream_shared {
181            Some(ref shared) => {
182                let read_position = shared.read_position();
183                self.range_available(Range::new(read_position, self.len() - read_position))
184            }
185            None => true,
186        }
187    }
188
189    pub fn ping_time(&self) -> Option<Duration> {
190        self.stream_shared.as_ref().map(|shared| shared.ping_time())
191    }
192
193    fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
194        if let Some(ref channel) = self.channel_tx {
195            // Ignore the error in case the channel has been closed already.
196            // This means that the file was completely downloaded.
197            let _ = channel.send(command);
198        }
199    }
200
201    pub fn fetch(&self, range: Range) {
202        // signal the stream loader to fetch a range of the file
203        self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
204    }
205
206    pub fn fetch_blocking(&self, mut range: Range) -> AudioFileResult {
207        // signal the stream loader to tech a range of the file and block until it is loaded.
208
209        // ensure the range is within the file's bounds.
210        if range.start >= self.len() {
211            range.length = 0;
212        } else if range.end() > self.len() {
213            range.length = self.len() - range.start;
214        }
215
216        self.fetch(range);
217
218        if let Some(ref shared) = self.stream_shared {
219            let mut download_status = shared.download_status.lock();
220            let download_timeout = AudioFetchParams::get().download_timeout;
221
222            while range.length
223                > download_status
224                    .downloaded
225                    .contained_length_from_value(range.start)
226            {
227                if shared
228                    .cond
229                    .wait_for(&mut download_status, download_timeout)
230                    .timed_out()
231                {
232                    return Err(AudioFileError::WaitTimeout.into());
233                }
234
235                if range.length
236                    > (download_status
237                        .downloaded
238                        .union(&download_status.requested)
239                        .contained_length_from_value(range.start))
240                {
241                    // For some reason, the requested range is neither downloaded nor requested.
242                    // This could be due to a network error. Request it again.
243                    self.fetch(range);
244                }
245            }
246        }
247
248        Ok(())
249    }
250
251    pub fn fetch_next_and_wait(
252        &self,
253        request_length: usize,
254        wait_length: usize,
255    ) -> AudioFileResult {
256        match self.stream_shared {
257            Some(ref shared) => {
258                let start = shared.read_position();
259
260                let request_range = Range {
261                    start,
262                    length: request_length,
263                };
264                self.fetch(request_range);
265
266                let wait_range = Range {
267                    start,
268                    length: wait_length,
269                };
270                self.fetch_blocking(wait_range)
271            }
272            None => Ok(()),
273        }
274    }
275
276    pub fn set_random_access_mode(&self) {
277        // optimise download strategy for random access
278        if let Some(ref shared) = self.stream_shared {
279            shared.set_download_streaming(false)
280        }
281    }
282
283    pub fn set_stream_mode(&self) {
284        // optimise download strategy for streaming
285        if let Some(ref shared) = self.stream_shared {
286            shared.set_download_streaming(true)
287        }
288    }
289
290    pub fn close(&self) {
291        // terminate stream loading and don't load any more data for this file.
292        self.send_stream_loader_command(StreamLoaderCommand::Close);
293    }
294}
295
296pub struct AudioFileStreaming {
297    read_file: fs::File,
298    position: u64,
299    stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
300    shared: Arc<AudioFileShared>,
301}
302
303struct AudioFileDownloadStatus {
304    requested: RangeSet,
305    downloaded: RangeSet,
306}
307
308struct AudioFileShared {
309    cdn_url: CdnUrl,
310    file_size: usize,
311    bytes_per_second: usize,
312    cond: Condvar,
313    download_status: Mutex<AudioFileDownloadStatus>,
314    download_streaming: AtomicBool,
315    download_slots: Semaphore,
316    ping_time_ms: AtomicUsize,
317    read_position: AtomicUsize,
318    throughput: AtomicUsize,
319}
320
321impl AudioFileShared {
322    fn is_download_streaming(&self) -> bool {
323        self.download_streaming.load(Ordering::Acquire)
324    }
325
326    fn set_download_streaming(&self, streaming: bool) {
327        self.download_streaming.store(streaming, Ordering::Release)
328    }
329
330    fn ping_time(&self) -> Duration {
331        let ping_time_ms = self.ping_time_ms.load(Ordering::Acquire);
332        if ping_time_ms > 0 {
333            Duration::from_millis(ping_time_ms as u64)
334        } else {
335            AudioFetchParams::get().initial_ping_time_estimate
336        }
337    }
338
339    fn set_ping_time(&self, duration: Duration) {
340        self.ping_time_ms
341            .store(duration.as_millis() as usize, Ordering::Release)
342    }
343
344    fn throughput(&self) -> usize {
345        self.throughput.load(Ordering::Acquire)
346    }
347
348    fn set_throughput(&self, throughput: usize) {
349        self.throughput.store(throughput, Ordering::Release)
350    }
351
352    fn read_position(&self) -> usize {
353        self.read_position.load(Ordering::Acquire)
354    }
355
356    fn set_read_position(&self, position: u64) {
357        self.read_position
358            .store(position as usize, Ordering::Release)
359    }
360}
361
362impl AudioFile {
363    pub async fn open(
364        session: &Session,
365        file_id: FileId,
366        bytes_per_second: usize,
367    ) -> Result<AudioFile, Error> {
368        if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
369            debug!("File {} already in cache", file_id);
370            return Ok(AudioFile::Cached(file));
371        }
372
373        debug!("Downloading file {}", file_id);
374
375        let (complete_tx, complete_rx) = oneshot::channel();
376
377        let streaming =
378            AudioFileStreaming::open(session.clone(), file_id, complete_tx, bytes_per_second);
379
380        let session_ = session.clone();
381        session.spawn(complete_rx.map_ok(move |mut file| {
382            debug!("Downloading file {} complete", file_id);
383
384            if let Some(cache) = session_.cache() {
385                if let Some(cache_id) = cache.file_path(file_id) {
386                    if let Err(e) = cache.save_file(file_id, &mut file) {
387                        error!("Error caching file {} to {:?}: {}", file_id, cache_id, e);
388                    } else {
389                        debug!("File {} cached to {:?}", file_id, cache_id);
390                    }
391                }
392            }
393        }));
394
395        Ok(AudioFile::Streaming(streaming.await?))
396    }
397
398    pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, Error> {
399        let controller = match self {
400            AudioFile::Streaming(ref stream) => StreamLoaderController {
401                channel_tx: Some(stream.stream_loader_command_tx.clone()),
402                stream_shared: Some(stream.shared.clone()),
403                file_size: stream.shared.file_size,
404            },
405            AudioFile::Cached(ref file) => StreamLoaderController {
406                channel_tx: None,
407                stream_shared: None,
408                file_size: file.metadata()?.len() as usize,
409            },
410        };
411
412        Ok(controller)
413    }
414
415    pub fn is_cached(&self) -> bool {
416        matches!(self, AudioFile::Cached { .. })
417    }
418}
419
420impl AudioFileStreaming {
421    pub async fn open(
422        session: Session,
423        file_id: FileId,
424        complete_tx: oneshot::Sender<NamedTempFile>,
425        bytes_per_second: usize,
426    ) -> Result<AudioFileStreaming, Error> {
427        let cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
428
429        if let Ok(url) = cdn_url.try_get_url() {
430            trace!("Streaming from {}", url);
431        }
432
433        let minimum_download_size = AudioFetchParams::get().minimum_download_size;
434
435        // When the audio file is really small, this `download_size` may turn out to be
436        // larger than the audio file we're going to stream later on. This is OK; requesting
437        // `Content-Range` > `Content-Length` will return the complete file with status code
438        // 206 Partial Content.
439        let mut streamer =
440            session
441                .spclient()
442                .stream_from_cdn(&cdn_url, 0, minimum_download_size)?;
443
444        // Get the first chunk with the headers to get the file size.
445        // The remainder of that chunk with possibly also a response body is then
446        // further processed in `audio_file_fetch`.
447        let response = streamer.next().await.ok_or(AudioFileError::NoData)??;
448
449        let code = response.status();
450        if code != StatusCode::PARTIAL_CONTENT {
451            debug!(
452                "Opening audio file expected partial content but got: {}",
453                code
454            );
455            return Err(AudioFileError::StatusCode(code).into());
456        }
457
458        let header_value = response
459            .headers()
460            .get(CONTENT_RANGE)
461            .ok_or(AudioFileError::Header)?;
462        let str_value = header_value.to_str()?;
463        let hyphen_index = str_value.find('-').unwrap_or_default();
464        let slash_index = str_value.find('/').unwrap_or_default();
465        let upper_bound: usize = str_value[hyphen_index + 1..slash_index].parse()?;
466        let file_size = str_value[slash_index + 1..].parse()?;
467
468        let initial_request = StreamingRequest {
469            streamer,
470            initial_response: Some(response),
471            offset: 0,
472            length: upper_bound + 1,
473        };
474
475        let shared = Arc::new(AudioFileShared {
476            cdn_url,
477            file_size,
478            bytes_per_second,
479            cond: Condvar::new(),
480            download_status: Mutex::new(AudioFileDownloadStatus {
481                requested: RangeSet::new(),
482                downloaded: RangeSet::new(),
483            }),
484            download_streaming: AtomicBool::new(false),
485            download_slots: Semaphore::new(1),
486            ping_time_ms: AtomicUsize::new(0),
487            read_position: AtomicUsize::new(0),
488            throughput: AtomicUsize::new(0),
489        });
490
491        let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?;
492        write_file.as_file().set_len(file_size as u64)?;
493
494        let read_file = write_file.reopen()?;
495
496        let (stream_loader_command_tx, stream_loader_command_rx) =
497            mpsc::unbounded_channel::<StreamLoaderCommand>();
498
499        session.spawn(audio_file_fetch(
500            session.clone(),
501            shared.clone(),
502            initial_request,
503            write_file,
504            stream_loader_command_rx,
505            complete_tx,
506        ));
507
508        Ok(AudioFileStreaming {
509            read_file,
510            position: 0,
511            stream_loader_command_tx,
512            shared,
513        })
514    }
515}
516
517impl Read for AudioFileStreaming {
518    fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
519        let offset = self.position as usize;
520
521        if offset >= self.shared.file_size {
522            return Ok(0);
523        }
524
525        let length = min(output.len(), self.shared.file_size - offset);
526        if length == 0 {
527            return Ok(0);
528        }
529
530        let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
531        let length_to_request = if self.shared.is_download_streaming() {
532            let length_to_request = length
533                + (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32)
534                    as usize;
535
536            // Due to the read-ahead stuff, we potentially request more than the actual request demanded.
537            min(length_to_request, self.shared.file_size - offset)
538        } else {
539            length
540        };
541
542        let mut ranges_to_request = RangeSet::new();
543        ranges_to_request.add_range(&Range::new(offset, length_to_request));
544
545        let mut download_status = self.shared.download_status.lock();
546
547        ranges_to_request.subtract_range_set(&download_status.downloaded);
548        ranges_to_request.subtract_range_set(&download_status.requested);
549
550        for &range in ranges_to_request.iter() {
551            self.stream_loader_command_tx
552                .send(StreamLoaderCommand::Fetch(range))
553                .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
554        }
555
556        let download_timeout = AudioFetchParams::get().download_timeout;
557        while !download_status.downloaded.contains(offset) {
558            if self
559                .shared
560                .cond
561                .wait_for(&mut download_status, download_timeout)
562                .timed_out()
563            {
564                return Err(io::Error::new(
565                    io::ErrorKind::TimedOut,
566                    Error::deadline_exceeded(AudioFileError::WaitTimeout),
567                ));
568            }
569        }
570        let available_length = download_status
571            .downloaded
572            .contained_length_from_value(offset);
573
574        drop(download_status);
575
576        self.position = self.read_file.seek(SeekFrom::Start(offset as u64))?;
577        let read_len = min(length, available_length);
578        let read_len = self.read_file.read(&mut output[..read_len])?;
579
580        self.position += read_len as u64;
581        self.shared.set_read_position(self.position);
582
583        Ok(read_len)
584    }
585}
586
587impl Seek for AudioFileStreaming {
588    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
589        // If we are already at this position, we don't need to switch download mode.
590        // These checks and locks are less expensive than interrupting streaming.
591        let current_position = self.position as i64;
592        let requested_pos = match pos {
593            SeekFrom::Start(pos) => pos as i64,
594            SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1,
595            SeekFrom::Current(pos) => current_position + pos,
596        };
597        if requested_pos == current_position {
598            return Ok(current_position as u64);
599        }
600
601        // Again if we have already downloaded this part.
602        let available = self
603            .shared
604            .download_status
605            .lock()
606            .downloaded
607            .contains(requested_pos as usize);
608
609        let mut was_streaming = false;
610        if !available {
611            // Ensure random access mode if we need to download this part.
612            // Checking whether we are streaming now is a micro-optimization
613            // to save an atomic load.
614            was_streaming = self.shared.is_download_streaming();
615            if was_streaming {
616                self.shared.set_download_streaming(false);
617            }
618        }
619
620        self.position = self.read_file.seek(pos)?;
621        self.shared.set_read_position(self.position);
622
623        if !available && was_streaming {
624            self.shared.set_download_streaming(true);
625        }
626
627        Ok(self.position)
628    }
629}
630
631impl Read for AudioFile {
632    fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
633        match *self {
634            AudioFile::Cached(ref mut file) => file.read(output),
635            AudioFile::Streaming(ref mut file) => file.read(output),
636        }
637    }
638}
639
640impl Seek for AudioFile {
641    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
642        match *self {
643            AudioFile::Cached(ref mut file) => file.seek(pos),
644            AudioFile::Streaming(ref mut file) => file.seek(pos),
645        }
646    }
647}