1mod receive;
2
3use std::{
4 cmp::min,
5 fs,
6 io::{self, Read, Seek, SeekFrom},
7 sync::{
8 Arc, OnceLock,
9 atomic::{AtomicBool, AtomicUsize, Ordering},
10 },
11 sync::{Condvar, Mutex},
12 time::Duration,
13};
14
15use futures_util::{StreamExt, TryFutureExt, future::IntoStream};
16use hyper::{Response, StatusCode, body::Incoming, header::CONTENT_RANGE};
17use hyper_util::client::legacy::ResponseFuture;
18
19use tempfile::NamedTempFile;
20use thiserror::Error;
21use tokio::sync::{Semaphore, mpsc, oneshot};
22
23use librespot_core::{Error, FileId, Session, cdn_url::CdnUrl};
24
25use self::receive::audio_file_fetch;
26
27use crate::range_set::{Range, RangeSet};
28
29pub type AudioFileResult = Result<(), librespot_core::Error>;
30
31const DOWNLOAD_STATUS_POISON_MSG: &str = "audio download status mutex should not be poisoned";
32
33#[derive(Error, Debug)]
34pub enum AudioFileError {
35 #[error("other end of channel disconnected")]
36 Channel,
37 #[error("required header not found")]
38 Header,
39 #[error("streamer received no data")]
40 NoData,
41 #[error("no output available")]
42 Output,
43 #[error("invalid status code {0}")]
44 StatusCode(StatusCode),
45 #[error("wait timeout exceeded")]
46 WaitTimeout,
47}
48
49impl From<AudioFileError> for Error {
50 fn from(err: AudioFileError) -> Self {
51 match err {
52 AudioFileError::Channel => Error::aborted(err),
53 AudioFileError::Header => Error::unavailable(err),
54 AudioFileError::NoData => Error::unavailable(err),
55 AudioFileError::Output => Error::aborted(err),
56 AudioFileError::StatusCode(_) => Error::failed_precondition(err),
57 AudioFileError::WaitTimeout => Error::deadline_exceeded(err),
58 }
59 }
60}
61
62#[derive(Clone)]
63pub struct AudioFetchParams {
64 pub minimum_download_size: usize,
69
70 pub minimum_throughput: usize,
73
74 pub initial_ping_time_estimate: Duration,
76
77 pub maximum_assumed_ping_time: Duration,
80
81 pub read_ahead_before_playback: Duration,
85
86 pub read_ahead_during_playback: Duration,
91
92 pub prefetch_threshold_factor: f32,
96
97 pub download_timeout: Duration,
99}
100
101impl Default for AudioFetchParams {
102 fn default() -> Self {
103 let minimum_download_size = 64 * 1024;
104 let minimum_throughput = 8 * 1024;
105 Self {
106 minimum_download_size,
107 minimum_throughput,
108 initial_ping_time_estimate: Duration::from_millis(500),
109 maximum_assumed_ping_time: Duration::from_millis(1500),
110 read_ahead_before_playback: Duration::from_secs(1),
111 read_ahead_during_playback: Duration::from_secs(5),
112 prefetch_threshold_factor: 4.0,
113 download_timeout: Duration::from_secs(
114 (minimum_download_size / minimum_throughput) as u64,
115 ),
116 }
117 }
118}
119
120static AUDIO_FETCH_PARAMS: OnceLock<AudioFetchParams> = OnceLock::new();
121
122impl AudioFetchParams {
123 pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> {
124 AUDIO_FETCH_PARAMS.set(params)
125 }
126
127 pub fn get() -> &'static AudioFetchParams {
128 AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default)
129 }
130}
131
132pub enum AudioFile {
133 Cached(fs::File),
134 Streaming(AudioFileStreaming),
135}
136
137#[derive(Debug)]
138pub struct StreamingRequest {
139 streamer: IntoStream<ResponseFuture>,
140 initial_response: Option<Response<Incoming>>,
141 offset: usize,
142 length: usize,
143}
144
145#[derive(Debug)]
146pub enum StreamLoaderCommand {
147 Fetch(Range), Close, }
150
151#[derive(Clone)]
152pub struct StreamLoaderController {
153 channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
154 stream_shared: Option<Arc<AudioFileShared>>,
155 file_size: usize,
156}
157
158impl StreamLoaderController {
159 pub fn len(&self) -> usize {
160 self.file_size
161 }
162
163 pub fn is_empty(&self) -> bool {
164 self.file_size == 0
165 }
166
167 pub fn range_available(&self, range: Range) -> bool {
168 if let Some(ref shared) = self.stream_shared {
169 let download_status = shared
170 .download_status
171 .lock()
172 .expect(DOWNLOAD_STATUS_POISON_MSG);
173
174 range.length
175 <= download_status
176 .downloaded
177 .contained_length_from_value(range.start)
178 } else {
179 range.length <= self.len() - range.start
180 }
181 }
182
183 pub fn range_to_end_available(&self) -> bool {
184 match self.stream_shared {
185 Some(ref shared) => {
186 let read_position = shared.read_position();
187 self.range_available(Range::new(read_position, self.len() - read_position))
188 }
189 None => true,
190 }
191 }
192
193 pub fn ping_time(&self) -> Option<Duration> {
194 self.stream_shared.as_ref().map(|shared| shared.ping_time())
195 }
196
197 fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
198 if let Some(ref channel) = self.channel_tx {
199 let _ = channel.send(command);
202 }
203 }
204
205 pub fn fetch(&self, range: Range) {
206 self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
208 }
209
210 pub fn fetch_blocking(&self, mut range: Range) -> AudioFileResult {
211 if range.start >= self.len() {
215 range.length = 0;
216 } else if range.end() > self.len() {
217 range.length = self.len() - range.start;
218 }
219
220 self.fetch(range);
221
222 if let Some(ref shared) = self.stream_shared {
223 let mut download_status = shared
224 .download_status
225 .lock()
226 .expect(DOWNLOAD_STATUS_POISON_MSG);
227 let download_timeout = AudioFetchParams::get().download_timeout;
228
229 while range.length
230 > download_status
231 .downloaded
232 .contained_length_from_value(range.start)
233 {
234 let (new_download_status, wait_result) = shared
235 .cond
236 .wait_timeout(download_status, download_timeout)
237 .expect(DOWNLOAD_STATUS_POISON_MSG);
238
239 download_status = new_download_status;
240 if wait_result.timed_out() {
241 return Err(AudioFileError::WaitTimeout.into());
242 }
243
244 if range.length
245 > (download_status
246 .downloaded
247 .union(&download_status.requested)
248 .contained_length_from_value(range.start))
249 {
250 self.fetch(range);
253 }
254 }
255 }
256
257 Ok(())
258 }
259
260 pub fn fetch_next_and_wait(
261 &self,
262 request_length: usize,
263 wait_length: usize,
264 ) -> AudioFileResult {
265 match self.stream_shared {
266 Some(ref shared) => {
267 let start = shared.read_position();
268
269 let request_range = Range {
270 start,
271 length: request_length,
272 };
273 self.fetch(request_range);
274
275 let wait_range = Range {
276 start,
277 length: wait_length,
278 };
279 self.fetch_blocking(wait_range)
280 }
281 None => Ok(()),
282 }
283 }
284
285 pub fn set_random_access_mode(&self) {
286 if let Some(ref shared) = self.stream_shared {
288 shared.set_download_streaming(false)
289 }
290 }
291
292 pub fn set_stream_mode(&self) {
293 if let Some(ref shared) = self.stream_shared {
295 shared.set_download_streaming(true)
296 }
297 }
298
299 pub fn close(&self) {
300 self.send_stream_loader_command(StreamLoaderCommand::Close);
302 }
303
304 pub fn from_local_file(file_size: u64) -> Self {
305 Self {
306 channel_tx: None,
307 stream_shared: None,
308 file_size: file_size as usize,
309 }
310 }
311}
312
313pub struct AudioFileStreaming {
314 read_file: fs::File,
315 position: u64,
316 stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
317 shared: Arc<AudioFileShared>,
318}
319
320struct AudioFileDownloadStatus {
321 requested: RangeSet,
322 downloaded: RangeSet,
323}
324
325struct AudioFileShared {
326 cdn_url: String,
327 file_size: usize,
328 bytes_per_second: usize,
329 cond: Condvar,
330 download_status: Mutex<AudioFileDownloadStatus>,
331 download_streaming: AtomicBool,
332 download_slots: Semaphore,
333 ping_time_ms: AtomicUsize,
334 read_position: AtomicUsize,
335 throughput: AtomicUsize,
336}
337
338impl AudioFileShared {
339 fn is_download_streaming(&self) -> bool {
340 self.download_streaming.load(Ordering::Acquire)
341 }
342
343 fn set_download_streaming(&self, streaming: bool) {
344 self.download_streaming.store(streaming, Ordering::Release)
345 }
346
347 fn ping_time(&self) -> Duration {
348 let ping_time_ms = self.ping_time_ms.load(Ordering::Acquire);
349 if ping_time_ms > 0 {
350 Duration::from_millis(ping_time_ms as u64)
351 } else {
352 AudioFetchParams::get().initial_ping_time_estimate
353 }
354 }
355
356 fn set_ping_time(&self, duration: Duration) {
357 self.ping_time_ms
358 .store(duration.as_millis() as usize, Ordering::Release)
359 }
360
361 fn throughput(&self) -> usize {
362 self.throughput.load(Ordering::Acquire)
363 }
364
365 fn set_throughput(&self, throughput: usize) {
366 self.throughput.store(throughput, Ordering::Release)
367 }
368
369 fn read_position(&self) -> usize {
370 self.read_position.load(Ordering::Acquire)
371 }
372
373 fn set_read_position(&self, position: u64) {
374 self.read_position
375 .store(position as usize, Ordering::Release)
376 }
377}
378
379impl AudioFile {
380 pub async fn open(
381 session: &Session,
382 file_id: FileId,
383 bytes_per_second: usize,
384 ) -> Result<AudioFile, Error> {
385 if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
386 debug!("File {file_id} already in cache");
387 return Ok(AudioFile::Cached(file));
388 }
389
390 debug!("Downloading file {file_id}");
391
392 let (complete_tx, complete_rx) = oneshot::channel();
393
394 let streaming =
395 AudioFileStreaming::open(session.clone(), file_id, complete_tx, bytes_per_second);
396
397 let session_ = session.clone();
398 session.spawn(complete_rx.map_ok(move |mut file| {
399 debug!("Downloading file {file_id} complete");
400
401 if let Some(cache) = session_.cache() {
402 if let Some(cache_id) = cache.file_path(file_id) {
403 if let Err(e) = cache.save_file(file_id, &mut file) {
404 error!("Error caching file {file_id} to {cache_id:?}: {e}");
405 } else {
406 debug!("File {file_id} cached to {cache_id:?}");
407 }
408 }
409 }
410 }));
411
412 Ok(AudioFile::Streaming(streaming.await?))
413 }
414
415 pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, Error> {
416 let controller = match self {
417 AudioFile::Streaming(stream) => StreamLoaderController {
418 channel_tx: Some(stream.stream_loader_command_tx.clone()),
419 stream_shared: Some(stream.shared.clone()),
420 file_size: stream.shared.file_size,
421 },
422 AudioFile::Cached(file) => StreamLoaderController {
423 channel_tx: None,
424 stream_shared: None,
425 file_size: file.metadata()?.len() as usize,
426 },
427 };
428
429 Ok(controller)
430 }
431
432 pub fn is_cached(&self) -> bool {
433 matches!(self, AudioFile::Cached { .. })
434 }
435}
436
437impl AudioFileStreaming {
438 pub async fn open(
439 session: Session,
440 file_id: FileId,
441 complete_tx: oneshot::Sender<NamedTempFile>,
442 bytes_per_second: usize,
443 ) -> Result<AudioFileStreaming, Error> {
444 let cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
445
446 let minimum_download_size = AudioFetchParams::get().minimum_download_size;
447
448 let mut response_streamer_url = None;
449 let urls = cdn_url.try_get_urls()?;
450 for url in &urls {
451 let mut streamer =
456 session
457 .spclient()
458 .stream_from_cdn(*url, 0, minimum_download_size)?;
459
460 let streamer_result = tokio::time::timeout(Duration::from_secs(10), streamer.next())
464 .await
465 .map_err(|_| AudioFileError::WaitTimeout.into())
466 .and_then(|x| x.ok_or_else(|| AudioFileError::NoData.into()))
467 .and_then(|x| x.map_err(Error::from));
468
469 match streamer_result {
470 Ok(r) => {
471 response_streamer_url = Some((r, streamer, url));
472 break;
473 }
474 Err(e) => warn!("Fetching {url} failed with error {e:?}, trying next"),
475 }
476 }
477
478 let Some((response, streamer, url)) = response_streamer_url else {
479 return Err(Error::unavailable(format!(
480 "{} URLs failed, none left to try",
481 urls.len()
482 )));
483 };
484
485 trace!("Streaming from {url}");
486
487 let code = response.status();
488 if code != StatusCode::PARTIAL_CONTENT {
489 debug!("Opening audio file expected partial content but got: {code}");
490 return Err(AudioFileError::StatusCode(code).into());
491 }
492
493 let header_value = response
494 .headers()
495 .get(CONTENT_RANGE)
496 .ok_or(AudioFileError::Header)?;
497 let str_value = header_value.to_str()?;
498 let hyphen_index = str_value.find('-').unwrap_or_default();
499 let slash_index = str_value.find('/').unwrap_or_default();
500 let upper_bound: usize = str_value[hyphen_index + 1..slash_index].parse()?;
501 let file_size = str_value[slash_index + 1..].parse()?;
502
503 let initial_request = StreamingRequest {
504 streamer,
505 initial_response: Some(response),
506 offset: 0,
507 length: upper_bound + 1,
508 };
509
510 let shared = Arc::new(AudioFileShared {
511 cdn_url: url.to_string(),
512 file_size,
513 bytes_per_second,
514 cond: Condvar::new(),
515 download_status: Mutex::new(AudioFileDownloadStatus {
516 requested: RangeSet::new(),
517 downloaded: RangeSet::new(),
518 }),
519 download_streaming: AtomicBool::new(false),
520 download_slots: Semaphore::new(1),
521 ping_time_ms: AtomicUsize::new(0),
522 read_position: AtomicUsize::new(0),
523 throughput: AtomicUsize::new(0),
524 });
525
526 let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?;
527 write_file.as_file().set_len(file_size as u64)?;
528
529 let read_file = write_file.reopen()?;
530
531 let (stream_loader_command_tx, stream_loader_command_rx) =
532 mpsc::unbounded_channel::<StreamLoaderCommand>();
533
534 session.spawn(audio_file_fetch(
535 session.clone(),
536 shared.clone(),
537 initial_request,
538 write_file,
539 stream_loader_command_rx,
540 complete_tx,
541 ));
542
543 Ok(AudioFileStreaming {
544 read_file,
545 position: 0,
546 stream_loader_command_tx,
547 shared,
548 })
549 }
550}
551
552impl Read for AudioFileStreaming {
553 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
554 let offset = self.position as usize;
555
556 if offset >= self.shared.file_size {
557 return Ok(0);
558 }
559
560 let length = min(output.len(), self.shared.file_size - offset);
561 if length == 0 {
562 return Ok(0);
563 }
564
565 let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
566 let length_to_request = if self.shared.is_download_streaming() {
567 let length_to_request = length
568 + (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32)
569 as usize;
570
571 min(length_to_request, self.shared.file_size - offset)
573 } else {
574 length
575 };
576
577 let mut ranges_to_request = RangeSet::new();
578 ranges_to_request.add_range(&Range::new(offset, length_to_request));
579
580 let mut download_status = self
581 .shared
582 .download_status
583 .lock()
584 .expect(DOWNLOAD_STATUS_POISON_MSG);
585
586 ranges_to_request.subtract_range_set(&download_status.downloaded);
587 ranges_to_request.subtract_range_set(&download_status.requested);
588
589 for &range in ranges_to_request.iter() {
590 self.stream_loader_command_tx
591 .send(StreamLoaderCommand::Fetch(range))
592 .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
593 }
594
595 let download_timeout = AudioFetchParams::get().download_timeout;
596 while !download_status.downloaded.contains(offset) {
597 let (new_download_status, wait_result) = self
598 .shared
599 .cond
600 .wait_timeout(download_status, download_timeout)
601 .expect(DOWNLOAD_STATUS_POISON_MSG);
602
603 download_status = new_download_status;
604 if wait_result.timed_out() {
605 return Err(io::Error::new(
606 io::ErrorKind::TimedOut,
607 Error::deadline_exceeded(AudioFileError::WaitTimeout),
608 ));
609 }
610 }
611 let available_length = download_status
612 .downloaded
613 .contained_length_from_value(offset);
614
615 drop(download_status);
616
617 self.position = self.read_file.seek(SeekFrom::Start(offset as u64))?;
618 let read_len = min(length, available_length);
619 let read_len = self.read_file.read(&mut output[..read_len])?;
620
621 self.position += read_len as u64;
622 self.shared.set_read_position(self.position);
623
624 Ok(read_len)
625 }
626}
627
628impl Seek for AudioFileStreaming {
629 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
630 let current_position = self.position as i64;
633 let requested_pos = match pos {
634 SeekFrom::Start(pos) => pos as i64,
635 SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1,
636 SeekFrom::Current(pos) => current_position + pos,
637 };
638 if requested_pos == current_position {
639 return Ok(current_position as u64);
640 }
641
642 let available = self
644 .shared
645 .download_status
646 .lock()
647 .expect(DOWNLOAD_STATUS_POISON_MSG)
648 .downloaded
649 .contains(requested_pos as usize);
650
651 let mut was_streaming = false;
652 if !available {
653 was_streaming = self.shared.is_download_streaming();
657 if was_streaming {
658 self.shared.set_download_streaming(false);
659 }
660 }
661
662 self.position = self.read_file.seek(pos)?;
663 self.shared.set_read_position(self.position);
664
665 if !available && was_streaming {
666 self.shared.set_download_streaming(true);
667 }
668
669 Ok(self.position)
670 }
671}
672
673impl Read for AudioFile {
674 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
675 match *self {
676 AudioFile::Cached(ref mut file) => file.read(output),
677 AudioFile::Streaming(ref mut file) => file.read(output),
678 }
679 }
680}
681
682impl Seek for AudioFile {
683 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
684 match *self {
685 AudioFile::Cached(ref mut file) => file.seek(pos),
686 AudioFile::Streaming(ref mut file) => file.seek(pos),
687 }
688 }
689}