librespot_audio/fetch/
mod.rs

1mod receive;
2
3use std::{
4    cmp::min,
5    fs,
6    io::{self, Read, Seek, SeekFrom},
7    sync::{
8        Arc, OnceLock,
9        atomic::{AtomicBool, AtomicUsize, Ordering},
10    },
11    time::Duration,
12};
13
14use futures_util::{StreamExt, TryFutureExt, future::IntoStream};
15use hyper::{Response, StatusCode, body::Incoming, header::CONTENT_RANGE};
16use hyper_util::client::legacy::ResponseFuture;
17use parking_lot::{Condvar, Mutex};
18use tempfile::NamedTempFile;
19use thiserror::Error;
20use tokio::sync::{Semaphore, mpsc, oneshot};
21
22use librespot_core::{Error, FileId, Session, cdn_url::CdnUrl};
23
24use self::receive::audio_file_fetch;
25
26use crate::range_set::{Range, RangeSet};
27
28pub type AudioFileResult = Result<(), librespot_core::Error>;
29
30#[derive(Error, Debug)]
31pub enum AudioFileError {
32    #[error("other end of channel disconnected")]
33    Channel,
34    #[error("required header not found")]
35    Header,
36    #[error("streamer received no data")]
37    NoData,
38    #[error("no output available")]
39    Output,
40    #[error("invalid status code {0}")]
41    StatusCode(StatusCode),
42    #[error("wait timeout exceeded")]
43    WaitTimeout,
44}
45
46impl From<AudioFileError> for Error {
47    fn from(err: AudioFileError) -> Self {
48        match err {
49            AudioFileError::Channel => Error::aborted(err),
50            AudioFileError::Header => Error::unavailable(err),
51            AudioFileError::NoData => Error::unavailable(err),
52            AudioFileError::Output => Error::aborted(err),
53            AudioFileError::StatusCode(_) => Error::failed_precondition(err),
54            AudioFileError::WaitTimeout => Error::deadline_exceeded(err),
55        }
56    }
57}
58
59#[derive(Clone)]
60pub struct AudioFetchParams {
61    /// The minimum size of a block that is requested from the Spotify servers in one request.
62    /// This is the block size that is typically requested while doing a `seek()` on a file.
63    /// The Symphonia decoder requires this to be a power of 2 and > 32 kB.
64    /// Note: smaller requests can happen if part of the block is downloaded already.
65    pub minimum_download_size: usize,
66
67    /// The minimum network throughput that we expect. Together with the minimum download size,
68    /// this will determine the time we will wait for a response.
69    pub minimum_throughput: usize,
70
71    /// The ping time that is used for calculations before a ping time was actually measured.
72    pub initial_ping_time_estimate: Duration,
73
74    /// If the measured ping time to the Spotify server is larger than this value, it is capped
75    /// to avoid run-away block sizes and pre-fetching.
76    pub maximum_assumed_ping_time: Duration,
77
78    /// Before playback starts, this many seconds of data must be present.
79    /// Note: the calculations are done using the nominal bitrate of the file. The actual amount
80    /// of audio data may be larger or smaller.
81    pub read_ahead_before_playback: Duration,
82
83    /// While playing back, this many seconds of data ahead of the current read position are
84    /// requested.
85    /// Note: the calculations are done using the nominal bitrate of the file. The actual amount
86    /// of audio data may be larger or smaller.
87    pub read_ahead_during_playback: Duration,
88
89    /// If the amount of data that is pending (requested but not received) is less than a certain amount,
90    /// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
91    /// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>`
92    pub prefetch_threshold_factor: f32,
93
94    /// The time we will wait to obtain status updates on downloading.
95    pub download_timeout: Duration,
96}
97
98impl Default for AudioFetchParams {
99    fn default() -> Self {
100        let minimum_download_size = 64 * 1024;
101        let minimum_throughput = 8 * 1024;
102        Self {
103            minimum_download_size,
104            minimum_throughput,
105            initial_ping_time_estimate: Duration::from_millis(500),
106            maximum_assumed_ping_time: Duration::from_millis(1500),
107            read_ahead_before_playback: Duration::from_secs(1),
108            read_ahead_during_playback: Duration::from_secs(5),
109            prefetch_threshold_factor: 4.0,
110            download_timeout: Duration::from_secs(
111                (minimum_download_size / minimum_throughput) as u64,
112            ),
113        }
114    }
115}
116
117static AUDIO_FETCH_PARAMS: OnceLock<AudioFetchParams> = OnceLock::new();
118
119impl AudioFetchParams {
120    pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> {
121        AUDIO_FETCH_PARAMS.set(params)
122    }
123
124    pub fn get() -> &'static AudioFetchParams {
125        AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default)
126    }
127}
128
129pub enum AudioFile {
130    Cached(fs::File),
131    Streaming(AudioFileStreaming),
132}
133
134#[derive(Debug)]
135pub struct StreamingRequest {
136    streamer: IntoStream<ResponseFuture>,
137    initial_response: Option<Response<Incoming>>,
138    offset: usize,
139    length: usize,
140}
141
142#[derive(Debug)]
143pub enum StreamLoaderCommand {
144    Fetch(Range), // signal the stream loader to fetch a range of the file
145    Close,        // terminate and don't load any more data
146}
147
148#[derive(Clone)]
149pub struct StreamLoaderController {
150    channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
151    stream_shared: Option<Arc<AudioFileShared>>,
152    file_size: usize,
153}
154
155impl StreamLoaderController {
156    pub fn len(&self) -> usize {
157        self.file_size
158    }
159
160    pub fn is_empty(&self) -> bool {
161        self.file_size == 0
162    }
163
164    pub fn range_available(&self, range: Range) -> bool {
165        if let Some(ref shared) = self.stream_shared {
166            let download_status = shared.download_status.lock();
167
168            range.length
169                <= download_status
170                    .downloaded
171                    .contained_length_from_value(range.start)
172        } else {
173            range.length <= self.len() - range.start
174        }
175    }
176
177    pub fn range_to_end_available(&self) -> bool {
178        match self.stream_shared {
179            Some(ref shared) => {
180                let read_position = shared.read_position();
181                self.range_available(Range::new(read_position, self.len() - read_position))
182            }
183            None => true,
184        }
185    }
186
187    pub fn ping_time(&self) -> Option<Duration> {
188        self.stream_shared.as_ref().map(|shared| shared.ping_time())
189    }
190
191    fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
192        if let Some(ref channel) = self.channel_tx {
193            // Ignore the error in case the channel has been closed already.
194            // This means that the file was completely downloaded.
195            let _ = channel.send(command);
196        }
197    }
198
199    pub fn fetch(&self, range: Range) {
200        // signal the stream loader to fetch a range of the file
201        self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
202    }
203
204    pub fn fetch_blocking(&self, mut range: Range) -> AudioFileResult {
205        // signal the stream loader to tech a range of the file and block until it is loaded.
206
207        // ensure the range is within the file's bounds.
208        if range.start >= self.len() {
209            range.length = 0;
210        } else if range.end() > self.len() {
211            range.length = self.len() - range.start;
212        }
213
214        self.fetch(range);
215
216        if let Some(ref shared) = self.stream_shared {
217            let mut download_status = shared.download_status.lock();
218            let download_timeout = AudioFetchParams::get().download_timeout;
219
220            while range.length
221                > download_status
222                    .downloaded
223                    .contained_length_from_value(range.start)
224            {
225                if shared
226                    .cond
227                    .wait_for(&mut download_status, download_timeout)
228                    .timed_out()
229                {
230                    return Err(AudioFileError::WaitTimeout.into());
231                }
232
233                if range.length
234                    > (download_status
235                        .downloaded
236                        .union(&download_status.requested)
237                        .contained_length_from_value(range.start))
238                {
239                    // For some reason, the requested range is neither downloaded nor requested.
240                    // This could be due to a network error. Request it again.
241                    self.fetch(range);
242                }
243            }
244        }
245
246        Ok(())
247    }
248
249    pub fn fetch_next_and_wait(
250        &self,
251        request_length: usize,
252        wait_length: usize,
253    ) -> AudioFileResult {
254        match self.stream_shared {
255            Some(ref shared) => {
256                let start = shared.read_position();
257
258                let request_range = Range {
259                    start,
260                    length: request_length,
261                };
262                self.fetch(request_range);
263
264                let wait_range = Range {
265                    start,
266                    length: wait_length,
267                };
268                self.fetch_blocking(wait_range)
269            }
270            None => Ok(()),
271        }
272    }
273
274    pub fn set_random_access_mode(&self) {
275        // optimise download strategy for random access
276        if let Some(ref shared) = self.stream_shared {
277            shared.set_download_streaming(false)
278        }
279    }
280
281    pub fn set_stream_mode(&self) {
282        // optimise download strategy for streaming
283        if let Some(ref shared) = self.stream_shared {
284            shared.set_download_streaming(true)
285        }
286    }
287
288    pub fn close(&self) {
289        // terminate stream loading and don't load any more data for this file.
290        self.send_stream_loader_command(StreamLoaderCommand::Close);
291    }
292}
293
294pub struct AudioFileStreaming {
295    read_file: fs::File,
296    position: u64,
297    stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
298    shared: Arc<AudioFileShared>,
299}
300
301struct AudioFileDownloadStatus {
302    requested: RangeSet,
303    downloaded: RangeSet,
304}
305
306struct AudioFileShared {
307    cdn_url: String,
308    file_size: usize,
309    bytes_per_second: usize,
310    cond: Condvar,
311    download_status: Mutex<AudioFileDownloadStatus>,
312    download_streaming: AtomicBool,
313    download_slots: Semaphore,
314    ping_time_ms: AtomicUsize,
315    read_position: AtomicUsize,
316    throughput: AtomicUsize,
317}
318
319impl AudioFileShared {
320    fn is_download_streaming(&self) -> bool {
321        self.download_streaming.load(Ordering::Acquire)
322    }
323
324    fn set_download_streaming(&self, streaming: bool) {
325        self.download_streaming.store(streaming, Ordering::Release)
326    }
327
328    fn ping_time(&self) -> Duration {
329        let ping_time_ms = self.ping_time_ms.load(Ordering::Acquire);
330        if ping_time_ms > 0 {
331            Duration::from_millis(ping_time_ms as u64)
332        } else {
333            AudioFetchParams::get().initial_ping_time_estimate
334        }
335    }
336
337    fn set_ping_time(&self, duration: Duration) {
338        self.ping_time_ms
339            .store(duration.as_millis() as usize, Ordering::Release)
340    }
341
342    fn throughput(&self) -> usize {
343        self.throughput.load(Ordering::Acquire)
344    }
345
346    fn set_throughput(&self, throughput: usize) {
347        self.throughput.store(throughput, Ordering::Release)
348    }
349
350    fn read_position(&self) -> usize {
351        self.read_position.load(Ordering::Acquire)
352    }
353
354    fn set_read_position(&self, position: u64) {
355        self.read_position
356            .store(position as usize, Ordering::Release)
357    }
358}
359
360impl AudioFile {
361    pub async fn open(
362        session: &Session,
363        file_id: FileId,
364        bytes_per_second: usize,
365    ) -> Result<AudioFile, Error> {
366        if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
367            debug!("File {file_id} already in cache");
368            return Ok(AudioFile::Cached(file));
369        }
370
371        debug!("Downloading file {file_id}");
372
373        let (complete_tx, complete_rx) = oneshot::channel();
374
375        let streaming =
376            AudioFileStreaming::open(session.clone(), file_id, complete_tx, bytes_per_second);
377
378        let session_ = session.clone();
379        session.spawn(complete_rx.map_ok(move |mut file| {
380            debug!("Downloading file {file_id} complete");
381
382            if let Some(cache) = session_.cache() {
383                if let Some(cache_id) = cache.file_path(file_id) {
384                    if let Err(e) = cache.save_file(file_id, &mut file) {
385                        error!("Error caching file {file_id} to {cache_id:?}: {e}");
386                    } else {
387                        debug!("File {file_id} cached to {cache_id:?}");
388                    }
389                }
390            }
391        }));
392
393        Ok(AudioFile::Streaming(streaming.await?))
394    }
395
396    pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, Error> {
397        let controller = match self {
398            AudioFile::Streaming(stream) => StreamLoaderController {
399                channel_tx: Some(stream.stream_loader_command_tx.clone()),
400                stream_shared: Some(stream.shared.clone()),
401                file_size: stream.shared.file_size,
402            },
403            AudioFile::Cached(file) => StreamLoaderController {
404                channel_tx: None,
405                stream_shared: None,
406                file_size: file.metadata()?.len() as usize,
407            },
408        };
409
410        Ok(controller)
411    }
412
413    pub fn is_cached(&self) -> bool {
414        matches!(self, AudioFile::Cached { .. })
415    }
416}
417
418impl AudioFileStreaming {
419    pub async fn open(
420        session: Session,
421        file_id: FileId,
422        complete_tx: oneshot::Sender<NamedTempFile>,
423        bytes_per_second: usize,
424    ) -> Result<AudioFileStreaming, Error> {
425        let cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
426
427        let minimum_download_size = AudioFetchParams::get().minimum_download_size;
428
429        let mut response_streamer_url = None;
430        let urls = cdn_url.try_get_urls()?;
431        for url in &urls {
432            // When the audio file is really small, this `download_size` may turn out to be
433            // larger than the audio file we're going to stream later on. This is OK; requesting
434            // `Content-Range` > `Content-Length` will return the complete file with status code
435            // 206 Partial Content.
436            let mut streamer =
437                session
438                    .spclient()
439                    .stream_from_cdn(*url, 0, minimum_download_size)?;
440
441            // Get the first chunk with the headers to get the file size.
442            // The remainder of that chunk with possibly also a response body is then
443            // further processed in `audio_file_fetch`.
444            let streamer_result = tokio::time::timeout(Duration::from_secs(10), streamer.next())
445                .await
446                .map_err(|_| AudioFileError::WaitTimeout.into())
447                .and_then(|x| x.ok_or_else(|| AudioFileError::NoData.into()))
448                .and_then(|x| x.map_err(Error::from));
449
450            match streamer_result {
451                Ok(r) => {
452                    response_streamer_url = Some((r, streamer, url));
453                    break;
454                }
455                Err(e) => warn!("Fetching {url} failed with error {e:?}, trying next"),
456            }
457        }
458
459        let Some((response, streamer, url)) = response_streamer_url else {
460            return Err(Error::unavailable(format!(
461                "{} URLs failed, none left to try",
462                urls.len()
463            )));
464        };
465
466        trace!("Streaming from {url}");
467
468        let code = response.status();
469        if code != StatusCode::PARTIAL_CONTENT {
470            debug!("Opening audio file expected partial content but got: {code}");
471            return Err(AudioFileError::StatusCode(code).into());
472        }
473
474        let header_value = response
475            .headers()
476            .get(CONTENT_RANGE)
477            .ok_or(AudioFileError::Header)?;
478        let str_value = header_value.to_str()?;
479        let hyphen_index = str_value.find('-').unwrap_or_default();
480        let slash_index = str_value.find('/').unwrap_or_default();
481        let upper_bound: usize = str_value[hyphen_index + 1..slash_index].parse()?;
482        let file_size = str_value[slash_index + 1..].parse()?;
483
484        let initial_request = StreamingRequest {
485            streamer,
486            initial_response: Some(response),
487            offset: 0,
488            length: upper_bound + 1,
489        };
490
491        let shared = Arc::new(AudioFileShared {
492            cdn_url: url.to_string(),
493            file_size,
494            bytes_per_second,
495            cond: Condvar::new(),
496            download_status: Mutex::new(AudioFileDownloadStatus {
497                requested: RangeSet::new(),
498                downloaded: RangeSet::new(),
499            }),
500            download_streaming: AtomicBool::new(false),
501            download_slots: Semaphore::new(1),
502            ping_time_ms: AtomicUsize::new(0),
503            read_position: AtomicUsize::new(0),
504            throughput: AtomicUsize::new(0),
505        });
506
507        let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?;
508        write_file.as_file().set_len(file_size as u64)?;
509
510        let read_file = write_file.reopen()?;
511
512        let (stream_loader_command_tx, stream_loader_command_rx) =
513            mpsc::unbounded_channel::<StreamLoaderCommand>();
514
515        session.spawn(audio_file_fetch(
516            session.clone(),
517            shared.clone(),
518            initial_request,
519            write_file,
520            stream_loader_command_rx,
521            complete_tx,
522        ));
523
524        Ok(AudioFileStreaming {
525            read_file,
526            position: 0,
527            stream_loader_command_tx,
528            shared,
529        })
530    }
531}
532
533impl Read for AudioFileStreaming {
534    fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
535        let offset = self.position as usize;
536
537        if offset >= self.shared.file_size {
538            return Ok(0);
539        }
540
541        let length = min(output.len(), self.shared.file_size - offset);
542        if length == 0 {
543            return Ok(0);
544        }
545
546        let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
547        let length_to_request = if self.shared.is_download_streaming() {
548            let length_to_request = length
549                + (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32)
550                    as usize;
551
552            // Due to the read-ahead stuff, we potentially request more than the actual request demanded.
553            min(length_to_request, self.shared.file_size - offset)
554        } else {
555            length
556        };
557
558        let mut ranges_to_request = RangeSet::new();
559        ranges_to_request.add_range(&Range::new(offset, length_to_request));
560
561        let mut download_status = self.shared.download_status.lock();
562
563        ranges_to_request.subtract_range_set(&download_status.downloaded);
564        ranges_to_request.subtract_range_set(&download_status.requested);
565
566        for &range in ranges_to_request.iter() {
567            self.stream_loader_command_tx
568                .send(StreamLoaderCommand::Fetch(range))
569                .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
570        }
571
572        let download_timeout = AudioFetchParams::get().download_timeout;
573        while !download_status.downloaded.contains(offset) {
574            if self
575                .shared
576                .cond
577                .wait_for(&mut download_status, download_timeout)
578                .timed_out()
579            {
580                return Err(io::Error::new(
581                    io::ErrorKind::TimedOut,
582                    Error::deadline_exceeded(AudioFileError::WaitTimeout),
583                ));
584            }
585        }
586        let available_length = download_status
587            .downloaded
588            .contained_length_from_value(offset);
589
590        drop(download_status);
591
592        self.position = self.read_file.seek(SeekFrom::Start(offset as u64))?;
593        let read_len = min(length, available_length);
594        let read_len = self.read_file.read(&mut output[..read_len])?;
595
596        self.position += read_len as u64;
597        self.shared.set_read_position(self.position);
598
599        Ok(read_len)
600    }
601}
602
603impl Seek for AudioFileStreaming {
604    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
605        // If we are already at this position, we don't need to switch download mode.
606        // These checks and locks are less expensive than interrupting streaming.
607        let current_position = self.position as i64;
608        let requested_pos = match pos {
609            SeekFrom::Start(pos) => pos as i64,
610            SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1,
611            SeekFrom::Current(pos) => current_position + pos,
612        };
613        if requested_pos == current_position {
614            return Ok(current_position as u64);
615        }
616
617        // Again if we have already downloaded this part.
618        let available = self
619            .shared
620            .download_status
621            .lock()
622            .downloaded
623            .contains(requested_pos as usize);
624
625        let mut was_streaming = false;
626        if !available {
627            // Ensure random access mode if we need to download this part.
628            // Checking whether we are streaming now is a micro-optimization
629            // to save an atomic load.
630            was_streaming = self.shared.is_download_streaming();
631            if was_streaming {
632                self.shared.set_download_streaming(false);
633            }
634        }
635
636        self.position = self.read_file.seek(pos)?;
637        self.shared.set_read_position(self.position);
638
639        if !available && was_streaming {
640            self.shared.set_download_streaming(true);
641        }
642
643        Ok(self.position)
644    }
645}
646
647impl Read for AudioFile {
648    fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
649        match *self {
650            AudioFile::Cached(ref mut file) => file.read(output),
651            AudioFile::Streaming(ref mut file) => file.read(output),
652        }
653    }
654}
655
656impl Seek for AudioFile {
657    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
658        match *self {
659            AudioFile::Cached(ref mut file) => file.seek(pos),
660            AudioFile::Streaming(ref mut file) => file.seek(pos),
661        }
662    }
663}