librespot_audio/fetch/
mod.rs

1mod receive;
2
3use std::{
4    cmp::min,
5    fs,
6    io::{self, Read, Seek, SeekFrom},
7    sync::{
8        Arc, OnceLock,
9        atomic::{AtomicBool, AtomicUsize, Ordering},
10    },
11    sync::{Condvar, Mutex},
12    time::Duration,
13};
14
15use futures_util::{StreamExt, TryFutureExt, future::IntoStream};
16use hyper::{Response, StatusCode, body::Incoming, header::CONTENT_RANGE};
17use hyper_util::client::legacy::ResponseFuture;
18
19use tempfile::NamedTempFile;
20use thiserror::Error;
21use tokio::sync::{Semaphore, mpsc, oneshot};
22
23use librespot_core::{Error, FileId, Session, cdn_url::CdnUrl};
24
25use self::receive::audio_file_fetch;
26
27use crate::range_set::{Range, RangeSet};
28
29pub type AudioFileResult = Result<(), librespot_core::Error>;
30
31const DOWNLOAD_STATUS_POISON_MSG: &str = "audio download status mutex should not be poisoned";
32
33#[derive(Error, Debug)]
34pub enum AudioFileError {
35    #[error("other end of channel disconnected")]
36    Channel,
37    #[error("required header not found")]
38    Header,
39    #[error("streamer received no data")]
40    NoData,
41    #[error("no output available")]
42    Output,
43    #[error("invalid status code {0}")]
44    StatusCode(StatusCode),
45    #[error("wait timeout exceeded")]
46    WaitTimeout,
47}
48
49impl From<AudioFileError> for Error {
50    fn from(err: AudioFileError) -> Self {
51        match err {
52            AudioFileError::Channel => Error::aborted(err),
53            AudioFileError::Header => Error::unavailable(err),
54            AudioFileError::NoData => Error::unavailable(err),
55            AudioFileError::Output => Error::aborted(err),
56            AudioFileError::StatusCode(_) => Error::failed_precondition(err),
57            AudioFileError::WaitTimeout => Error::deadline_exceeded(err),
58        }
59    }
60}
61
62#[derive(Clone)]
63pub struct AudioFetchParams {
64    /// The minimum size of a block that is requested from the Spotify servers in one request.
65    /// This is the block size that is typically requested while doing a `seek()` on a file.
66    /// The Symphonia decoder requires this to be a power of 2 and > 32 kB.
67    /// Note: smaller requests can happen if part of the block is downloaded already.
68    pub minimum_download_size: usize,
69
70    /// The minimum network throughput that we expect. Together with the minimum download size,
71    /// this will determine the time we will wait for a response.
72    pub minimum_throughput: usize,
73
74    /// The ping time that is used for calculations before a ping time was actually measured.
75    pub initial_ping_time_estimate: Duration,
76
77    /// If the measured ping time to the Spotify server is larger than this value, it is capped
78    /// to avoid run-away block sizes and pre-fetching.
79    pub maximum_assumed_ping_time: Duration,
80
81    /// Before playback starts, this many seconds of data must be present.
82    /// Note: the calculations are done using the nominal bitrate of the file. The actual amount
83    /// of audio data may be larger or smaller.
84    pub read_ahead_before_playback: Duration,
85
86    /// While playing back, this many seconds of data ahead of the current read position are
87    /// requested.
88    /// Note: the calculations are done using the nominal bitrate of the file. The actual amount
89    /// of audio data may be larger or smaller.
90    pub read_ahead_during_playback: Duration,
91
92    /// If the amount of data that is pending (requested but not received) is less than a certain amount,
93    /// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
94    /// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>`
95    pub prefetch_threshold_factor: f32,
96
97    /// The time we will wait to obtain status updates on downloading.
98    pub download_timeout: Duration,
99}
100
101impl Default for AudioFetchParams {
102    fn default() -> Self {
103        let minimum_download_size = 64 * 1024;
104        let minimum_throughput = 8 * 1024;
105        Self {
106            minimum_download_size,
107            minimum_throughput,
108            initial_ping_time_estimate: Duration::from_millis(500),
109            maximum_assumed_ping_time: Duration::from_millis(1500),
110            read_ahead_before_playback: Duration::from_secs(1),
111            read_ahead_during_playback: Duration::from_secs(5),
112            prefetch_threshold_factor: 4.0,
113            download_timeout: Duration::from_secs(
114                (minimum_download_size / minimum_throughput) as u64,
115            ),
116        }
117    }
118}
119
120static AUDIO_FETCH_PARAMS: OnceLock<AudioFetchParams> = OnceLock::new();
121
122impl AudioFetchParams {
123    pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> {
124        AUDIO_FETCH_PARAMS.set(params)
125    }
126
127    pub fn get() -> &'static AudioFetchParams {
128        AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default)
129    }
130}
131
132pub enum AudioFile {
133    Cached(fs::File),
134    Streaming(AudioFileStreaming),
135}
136
137#[derive(Debug)]
138pub struct StreamingRequest {
139    streamer: IntoStream<ResponseFuture>,
140    initial_response: Option<Response<Incoming>>,
141    offset: usize,
142    length: usize,
143}
144
145#[derive(Debug)]
146pub enum StreamLoaderCommand {
147    Fetch(Range), // signal the stream loader to fetch a range of the file
148    Close,        // terminate and don't load any more data
149}
150
151#[derive(Clone)]
152pub struct StreamLoaderController {
153    channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
154    stream_shared: Option<Arc<AudioFileShared>>,
155    file_size: usize,
156}
157
158impl StreamLoaderController {
159    pub fn len(&self) -> usize {
160        self.file_size
161    }
162
163    pub fn is_empty(&self) -> bool {
164        self.file_size == 0
165    }
166
167    pub fn range_available(&self, range: Range) -> bool {
168        if let Some(ref shared) = self.stream_shared {
169            let download_status = shared
170                .download_status
171                .lock()
172                .expect(DOWNLOAD_STATUS_POISON_MSG);
173
174            range.length
175                <= download_status
176                    .downloaded
177                    .contained_length_from_value(range.start)
178        } else {
179            range.length <= self.len() - range.start
180        }
181    }
182
183    pub fn range_to_end_available(&self) -> bool {
184        match self.stream_shared {
185            Some(ref shared) => {
186                let read_position = shared.read_position();
187                self.range_available(Range::new(read_position, self.len() - read_position))
188            }
189            None => true,
190        }
191    }
192
193    pub fn ping_time(&self) -> Option<Duration> {
194        self.stream_shared.as_ref().map(|shared| shared.ping_time())
195    }
196
197    fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
198        if let Some(ref channel) = self.channel_tx {
199            // Ignore the error in case the channel has been closed already.
200            // This means that the file was completely downloaded.
201            let _ = channel.send(command);
202        }
203    }
204
205    pub fn fetch(&self, range: Range) {
206        // signal the stream loader to fetch a range of the file
207        self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
208    }
209
210    pub fn fetch_blocking(&self, mut range: Range) -> AudioFileResult {
211        // signal the stream loader to tech a range of the file and block until it is loaded.
212
213        // ensure the range is within the file's bounds.
214        if range.start >= self.len() {
215            range.length = 0;
216        } else if range.end() > self.len() {
217            range.length = self.len() - range.start;
218        }
219
220        self.fetch(range);
221
222        if let Some(ref shared) = self.stream_shared {
223            let mut download_status = shared
224                .download_status
225                .lock()
226                .expect(DOWNLOAD_STATUS_POISON_MSG);
227            let download_timeout = AudioFetchParams::get().download_timeout;
228
229            while range.length
230                > download_status
231                    .downloaded
232                    .contained_length_from_value(range.start)
233            {
234                let (new_download_status, wait_result) = shared
235                    .cond
236                    .wait_timeout(download_status, download_timeout)
237                    .expect(DOWNLOAD_STATUS_POISON_MSG);
238
239                download_status = new_download_status;
240                if wait_result.timed_out() {
241                    return Err(AudioFileError::WaitTimeout.into());
242                }
243
244                if range.length
245                    > (download_status
246                        .downloaded
247                        .union(&download_status.requested)
248                        .contained_length_from_value(range.start))
249                {
250                    // For some reason, the requested range is neither downloaded nor requested.
251                    // This could be due to a network error. Request it again.
252                    self.fetch(range);
253                }
254            }
255        }
256
257        Ok(())
258    }
259
260    pub fn fetch_next_and_wait(
261        &self,
262        request_length: usize,
263        wait_length: usize,
264    ) -> AudioFileResult {
265        match self.stream_shared {
266            Some(ref shared) => {
267                let start = shared.read_position();
268
269                let request_range = Range {
270                    start,
271                    length: request_length,
272                };
273                self.fetch(request_range);
274
275                let wait_range = Range {
276                    start,
277                    length: wait_length,
278                };
279                self.fetch_blocking(wait_range)
280            }
281            None => Ok(()),
282        }
283    }
284
285    pub fn set_random_access_mode(&self) {
286        // optimise download strategy for random access
287        if let Some(ref shared) = self.stream_shared {
288            shared.set_download_streaming(false)
289        }
290    }
291
292    pub fn set_stream_mode(&self) {
293        // optimise download strategy for streaming
294        if let Some(ref shared) = self.stream_shared {
295            shared.set_download_streaming(true)
296        }
297    }
298
299    pub fn close(&self) {
300        // terminate stream loading and don't load any more data for this file.
301        self.send_stream_loader_command(StreamLoaderCommand::Close);
302    }
303
304    pub fn from_local_file(file_size: u64) -> Self {
305        Self {
306            channel_tx: None,
307            stream_shared: None,
308            file_size: file_size as usize,
309        }
310    }
311}
312
313pub struct AudioFileStreaming {
314    read_file: fs::File,
315    position: u64,
316    stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
317    shared: Arc<AudioFileShared>,
318}
319
320struct AudioFileDownloadStatus {
321    requested: RangeSet,
322    downloaded: RangeSet,
323}
324
325struct AudioFileShared {
326    cdn_url: String,
327    file_size: usize,
328    bytes_per_second: usize,
329    cond: Condvar,
330    download_status: Mutex<AudioFileDownloadStatus>,
331    download_streaming: AtomicBool,
332    download_slots: Semaphore,
333    ping_time_ms: AtomicUsize,
334    read_position: AtomicUsize,
335    throughput: AtomicUsize,
336}
337
338impl AudioFileShared {
339    fn is_download_streaming(&self) -> bool {
340        self.download_streaming.load(Ordering::Acquire)
341    }
342
343    fn set_download_streaming(&self, streaming: bool) {
344        self.download_streaming.store(streaming, Ordering::Release)
345    }
346
347    fn ping_time(&self) -> Duration {
348        let ping_time_ms = self.ping_time_ms.load(Ordering::Acquire);
349        if ping_time_ms > 0 {
350            Duration::from_millis(ping_time_ms as u64)
351        } else {
352            AudioFetchParams::get().initial_ping_time_estimate
353        }
354    }
355
356    fn set_ping_time(&self, duration: Duration) {
357        self.ping_time_ms
358            .store(duration.as_millis() as usize, Ordering::Release)
359    }
360
361    fn throughput(&self) -> usize {
362        self.throughput.load(Ordering::Acquire)
363    }
364
365    fn set_throughput(&self, throughput: usize) {
366        self.throughput.store(throughput, Ordering::Release)
367    }
368
369    fn read_position(&self) -> usize {
370        self.read_position.load(Ordering::Acquire)
371    }
372
373    fn set_read_position(&self, position: u64) {
374        self.read_position
375            .store(position as usize, Ordering::Release)
376    }
377}
378
379impl AudioFile {
380    pub async fn open(
381        session: &Session,
382        file_id: FileId,
383        bytes_per_second: usize,
384    ) -> Result<AudioFile, Error> {
385        if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
386            debug!("File {file_id} already in cache");
387            return Ok(AudioFile::Cached(file));
388        }
389
390        debug!("Downloading file {file_id}");
391
392        let (complete_tx, complete_rx) = oneshot::channel();
393
394        let streaming =
395            AudioFileStreaming::open(session.clone(), file_id, complete_tx, bytes_per_second);
396
397        let session_ = session.clone();
398        session.spawn(complete_rx.map_ok(move |mut file| {
399            debug!("Downloading file {file_id} complete");
400
401            if let Some(cache) = session_.cache() {
402                if let Some(cache_id) = cache.file_path(file_id) {
403                    if let Err(e) = cache.save_file(file_id, &mut file) {
404                        error!("Error caching file {file_id} to {cache_id:?}: {e}");
405                    } else {
406                        debug!("File {file_id} cached to {cache_id:?}");
407                    }
408                }
409            }
410        }));
411
412        Ok(AudioFile::Streaming(streaming.await?))
413    }
414
415    pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, Error> {
416        let controller = match self {
417            AudioFile::Streaming(stream) => StreamLoaderController {
418                channel_tx: Some(stream.stream_loader_command_tx.clone()),
419                stream_shared: Some(stream.shared.clone()),
420                file_size: stream.shared.file_size,
421            },
422            AudioFile::Cached(file) => StreamLoaderController {
423                channel_tx: None,
424                stream_shared: None,
425                file_size: file.metadata()?.len() as usize,
426            },
427        };
428
429        Ok(controller)
430    }
431
432    pub fn is_cached(&self) -> bool {
433        matches!(self, AudioFile::Cached { .. })
434    }
435}
436
437impl AudioFileStreaming {
438    pub async fn open(
439        session: Session,
440        file_id: FileId,
441        complete_tx: oneshot::Sender<NamedTempFile>,
442        bytes_per_second: usize,
443    ) -> Result<AudioFileStreaming, Error> {
444        let cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
445
446        let minimum_download_size = AudioFetchParams::get().minimum_download_size;
447
448        let mut response_streamer_url = None;
449        let urls = cdn_url.try_get_urls()?;
450        for url in &urls {
451            // When the audio file is really small, this `download_size` may turn out to be
452            // larger than the audio file we're going to stream later on. This is OK; requesting
453            // `Content-Range` > `Content-Length` will return the complete file with status code
454            // 206 Partial Content.
455            let mut streamer =
456                session
457                    .spclient()
458                    .stream_from_cdn(*url, 0, minimum_download_size)?;
459
460            // Get the first chunk with the headers to get the file size.
461            // The remainder of that chunk with possibly also a response body is then
462            // further processed in `audio_file_fetch`.
463            let streamer_result = tokio::time::timeout(Duration::from_secs(10), streamer.next())
464                .await
465                .map_err(|_| AudioFileError::WaitTimeout.into())
466                .and_then(|x| x.ok_or_else(|| AudioFileError::NoData.into()))
467                .and_then(|x| x.map_err(Error::from));
468
469            match streamer_result {
470                Ok(r) => {
471                    response_streamer_url = Some((r, streamer, url));
472                    break;
473                }
474                Err(e) => warn!("Fetching {url} failed with error {e:?}, trying next"),
475            }
476        }
477
478        let Some((response, streamer, url)) = response_streamer_url else {
479            return Err(Error::unavailable(format!(
480                "{} URLs failed, none left to try",
481                urls.len()
482            )));
483        };
484
485        trace!("Streaming from {url}");
486
487        let code = response.status();
488        if code != StatusCode::PARTIAL_CONTENT {
489            debug!("Opening audio file expected partial content but got: {code}");
490            return Err(AudioFileError::StatusCode(code).into());
491        }
492
493        let header_value = response
494            .headers()
495            .get(CONTENT_RANGE)
496            .ok_or(AudioFileError::Header)?;
497        let str_value = header_value.to_str()?;
498        let hyphen_index = str_value.find('-').unwrap_or_default();
499        let slash_index = str_value.find('/').unwrap_or_default();
500        let upper_bound: usize = str_value[hyphen_index + 1..slash_index].parse()?;
501        let file_size = str_value[slash_index + 1..].parse()?;
502
503        let initial_request = StreamingRequest {
504            streamer,
505            initial_response: Some(response),
506            offset: 0,
507            length: upper_bound + 1,
508        };
509
510        let shared = Arc::new(AudioFileShared {
511            cdn_url: url.to_string(),
512            file_size,
513            bytes_per_second,
514            cond: Condvar::new(),
515            download_status: Mutex::new(AudioFileDownloadStatus {
516                requested: RangeSet::new(),
517                downloaded: RangeSet::new(),
518            }),
519            download_streaming: AtomicBool::new(false),
520            download_slots: Semaphore::new(1),
521            ping_time_ms: AtomicUsize::new(0),
522            read_position: AtomicUsize::new(0),
523            throughput: AtomicUsize::new(0),
524        });
525
526        let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?;
527        write_file.as_file().set_len(file_size as u64)?;
528
529        let read_file = write_file.reopen()?;
530
531        let (stream_loader_command_tx, stream_loader_command_rx) =
532            mpsc::unbounded_channel::<StreamLoaderCommand>();
533
534        session.spawn(audio_file_fetch(
535            session.clone(),
536            shared.clone(),
537            initial_request,
538            write_file,
539            stream_loader_command_rx,
540            complete_tx,
541        ));
542
543        Ok(AudioFileStreaming {
544            read_file,
545            position: 0,
546            stream_loader_command_tx,
547            shared,
548        })
549    }
550}
551
552impl Read for AudioFileStreaming {
553    fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
554        let offset = self.position as usize;
555
556        if offset >= self.shared.file_size {
557            return Ok(0);
558        }
559
560        let length = min(output.len(), self.shared.file_size - offset);
561        if length == 0 {
562            return Ok(0);
563        }
564
565        let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
566        let length_to_request = if self.shared.is_download_streaming() {
567            let length_to_request = length
568                + (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32)
569                    as usize;
570
571            // Due to the read-ahead stuff, we potentially request more than the actual request demanded.
572            min(length_to_request, self.shared.file_size - offset)
573        } else {
574            length
575        };
576
577        let mut ranges_to_request = RangeSet::new();
578        ranges_to_request.add_range(&Range::new(offset, length_to_request));
579
580        let mut download_status = self
581            .shared
582            .download_status
583            .lock()
584            .expect(DOWNLOAD_STATUS_POISON_MSG);
585
586        ranges_to_request.subtract_range_set(&download_status.downloaded);
587        ranges_to_request.subtract_range_set(&download_status.requested);
588
589        for &range in ranges_to_request.iter() {
590            self.stream_loader_command_tx
591                .send(StreamLoaderCommand::Fetch(range))
592                .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
593        }
594
595        let download_timeout = AudioFetchParams::get().download_timeout;
596        while !download_status.downloaded.contains(offset) {
597            let (new_download_status, wait_result) = self
598                .shared
599                .cond
600                .wait_timeout(download_status, download_timeout)
601                .expect(DOWNLOAD_STATUS_POISON_MSG);
602
603            download_status = new_download_status;
604            if wait_result.timed_out() {
605                return Err(io::Error::new(
606                    io::ErrorKind::TimedOut,
607                    Error::deadline_exceeded(AudioFileError::WaitTimeout),
608                ));
609            }
610        }
611        let available_length = download_status
612            .downloaded
613            .contained_length_from_value(offset);
614
615        drop(download_status);
616
617        self.position = self.read_file.seek(SeekFrom::Start(offset as u64))?;
618        let read_len = min(length, available_length);
619        let read_len = self.read_file.read(&mut output[..read_len])?;
620
621        self.position += read_len as u64;
622        self.shared.set_read_position(self.position);
623
624        Ok(read_len)
625    }
626}
627
628impl Seek for AudioFileStreaming {
629    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
630        // If we are already at this position, we don't need to switch download mode.
631        // These checks and locks are less expensive than interrupting streaming.
632        let current_position = self.position as i64;
633        let requested_pos = match pos {
634            SeekFrom::Start(pos) => pos as i64,
635            SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1,
636            SeekFrom::Current(pos) => current_position + pos,
637        };
638        if requested_pos == current_position {
639            return Ok(current_position as u64);
640        }
641
642        // Again if we have already downloaded this part.
643        let available = self
644            .shared
645            .download_status
646            .lock()
647            .expect(DOWNLOAD_STATUS_POISON_MSG)
648            .downloaded
649            .contains(requested_pos as usize);
650
651        let mut was_streaming = false;
652        if !available {
653            // Ensure random access mode if we need to download this part.
654            // Checking whether we are streaming now is a micro-optimization
655            // to save an atomic load.
656            was_streaming = self.shared.is_download_streaming();
657            if was_streaming {
658                self.shared.set_download_streaming(false);
659            }
660        }
661
662        self.position = self.read_file.seek(pos)?;
663        self.shared.set_read_position(self.position);
664
665        if !available && was_streaming {
666            self.shared.set_download_streaming(true);
667        }
668
669        Ok(self.position)
670    }
671}
672
673impl Read for AudioFile {
674    fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
675        match *self {
676            AudioFile::Cached(ref mut file) => file.read(output),
677            AudioFile::Streaming(ref mut file) => file.read(output),
678        }
679    }
680}
681
682impl Seek for AudioFile {
683    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
684        match *self {
685            AudioFile::Cached(ref mut file) => file.seek(pos),
686            AudioFile::Streaming(ref mut file) => file.seek(pos),
687        }
688    }
689}