1mod receive;
2
3use std::{
4 cmp::min,
5 fs,
6 io::{self, Read, Seek, SeekFrom},
7 sync::{
8 atomic::{AtomicBool, AtomicUsize, Ordering},
9 Arc, OnceLock,
10 },
11 time::Duration,
12};
13
14use futures_util::{future::IntoStream, StreamExt, TryFutureExt};
15use hyper::{body::Incoming, header::CONTENT_RANGE, Response, StatusCode};
16use hyper_util::client::legacy::ResponseFuture;
17use parking_lot::{Condvar, Mutex};
18use tempfile::NamedTempFile;
19use thiserror::Error;
20use tokio::sync::{mpsc, oneshot, Semaphore};
21
22use librespot_core::{cdn_url::CdnUrl, Error, FileId, Session};
23
24use self::receive::audio_file_fetch;
25
26use crate::range_set::{Range, RangeSet};
27
28pub type AudioFileResult = Result<(), librespot_core::Error>;
29
30#[derive(Error, Debug)]
31pub enum AudioFileError {
32 #[error("other end of channel disconnected")]
33 Channel,
34 #[error("required header not found")]
35 Header,
36 #[error("streamer received no data")]
37 NoData,
38 #[error("no output available")]
39 Output,
40 #[error("invalid status code {0}")]
41 StatusCode(StatusCode),
42 #[error("wait timeout exceeded")]
43 WaitTimeout,
44}
45
46impl From<AudioFileError> for Error {
47 fn from(err: AudioFileError) -> Self {
48 match err {
49 AudioFileError::Channel => Error::aborted(err),
50 AudioFileError::Header => Error::unavailable(err),
51 AudioFileError::NoData => Error::unavailable(err),
52 AudioFileError::Output => Error::aborted(err),
53 AudioFileError::StatusCode(_) => Error::failed_precondition(err),
54 AudioFileError::WaitTimeout => Error::deadline_exceeded(err),
55 }
56 }
57}
58
59#[derive(Clone)]
60pub struct AudioFetchParams {
61 pub minimum_download_size: usize,
66
67 pub minimum_throughput: usize,
70
71 pub initial_ping_time_estimate: Duration,
73
74 pub maximum_assumed_ping_time: Duration,
77
78 pub read_ahead_before_playback: Duration,
82
83 pub read_ahead_during_playback: Duration,
88
89 pub prefetch_threshold_factor: f32,
93
94 pub download_timeout: Duration,
96}
97
98impl Default for AudioFetchParams {
99 fn default() -> Self {
100 let minimum_download_size = 64 * 1024;
101 let minimum_throughput = 8 * 1024;
102 Self {
103 minimum_download_size,
104 minimum_throughput,
105 initial_ping_time_estimate: Duration::from_millis(500),
106 maximum_assumed_ping_time: Duration::from_millis(1500),
107 read_ahead_before_playback: Duration::from_secs(1),
108 read_ahead_during_playback: Duration::from_secs(5),
109 prefetch_threshold_factor: 4.0,
110 download_timeout: Duration::from_secs(
111 (minimum_download_size / minimum_throughput) as u64,
112 ),
113 }
114 }
115}
116
117static AUDIO_FETCH_PARAMS: OnceLock<AudioFetchParams> = OnceLock::new();
118
119impl AudioFetchParams {
120 pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> {
121 AUDIO_FETCH_PARAMS.set(params)
122 }
123
124 pub fn get() -> &'static AudioFetchParams {
125 AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default)
126 }
127}
128
129pub enum AudioFile {
130 Cached(fs::File),
131 Streaming(AudioFileStreaming),
132}
133
134#[derive(Debug)]
135pub struct StreamingRequest {
136 streamer: IntoStream<ResponseFuture>,
137 initial_response: Option<Response<Incoming>>,
138 offset: usize,
139 length: usize,
140}
141
142#[derive(Debug)]
143pub enum StreamLoaderCommand {
144 Fetch(Range), Close, }
147
148#[derive(Clone)]
149pub struct StreamLoaderController {
150 channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
151 stream_shared: Option<Arc<AudioFileShared>>,
152 file_size: usize,
153}
154
155impl StreamLoaderController {
156 pub fn len(&self) -> usize {
157 self.file_size
158 }
159
160 pub fn is_empty(&self) -> bool {
161 self.file_size == 0
162 }
163
164 pub fn range_available(&self, range: Range) -> bool {
165 let available = if let Some(ref shared) = self.stream_shared {
166 let download_status = shared.download_status.lock();
167
168 range.length
169 <= download_status
170 .downloaded
171 .contained_length_from_value(range.start)
172 } else {
173 range.length <= self.len() - range.start
174 };
175
176 available
177 }
178
179 pub fn range_to_end_available(&self) -> bool {
180 match self.stream_shared {
181 Some(ref shared) => {
182 let read_position = shared.read_position();
183 self.range_available(Range::new(read_position, self.len() - read_position))
184 }
185 None => true,
186 }
187 }
188
189 pub fn ping_time(&self) -> Option<Duration> {
190 self.stream_shared.as_ref().map(|shared| shared.ping_time())
191 }
192
193 fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
194 if let Some(ref channel) = self.channel_tx {
195 let _ = channel.send(command);
198 }
199 }
200
201 pub fn fetch(&self, range: Range) {
202 self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
204 }
205
206 pub fn fetch_blocking(&self, mut range: Range) -> AudioFileResult {
207 if range.start >= self.len() {
211 range.length = 0;
212 } else if range.end() > self.len() {
213 range.length = self.len() - range.start;
214 }
215
216 self.fetch(range);
217
218 if let Some(ref shared) = self.stream_shared {
219 let mut download_status = shared.download_status.lock();
220 let download_timeout = AudioFetchParams::get().download_timeout;
221
222 while range.length
223 > download_status
224 .downloaded
225 .contained_length_from_value(range.start)
226 {
227 if shared
228 .cond
229 .wait_for(&mut download_status, download_timeout)
230 .timed_out()
231 {
232 return Err(AudioFileError::WaitTimeout.into());
233 }
234
235 if range.length
236 > (download_status
237 .downloaded
238 .union(&download_status.requested)
239 .contained_length_from_value(range.start))
240 {
241 self.fetch(range);
244 }
245 }
246 }
247
248 Ok(())
249 }
250
251 pub fn fetch_next_and_wait(
252 &self,
253 request_length: usize,
254 wait_length: usize,
255 ) -> AudioFileResult {
256 match self.stream_shared {
257 Some(ref shared) => {
258 let start = shared.read_position();
259
260 let request_range = Range {
261 start,
262 length: request_length,
263 };
264 self.fetch(request_range);
265
266 let wait_range = Range {
267 start,
268 length: wait_length,
269 };
270 self.fetch_blocking(wait_range)
271 }
272 None => Ok(()),
273 }
274 }
275
276 pub fn set_random_access_mode(&self) {
277 if let Some(ref shared) = self.stream_shared {
279 shared.set_download_streaming(false)
280 }
281 }
282
283 pub fn set_stream_mode(&self) {
284 if let Some(ref shared) = self.stream_shared {
286 shared.set_download_streaming(true)
287 }
288 }
289
290 pub fn close(&self) {
291 self.send_stream_loader_command(StreamLoaderCommand::Close);
293 }
294}
295
296pub struct AudioFileStreaming {
297 read_file: fs::File,
298 position: u64,
299 stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
300 shared: Arc<AudioFileShared>,
301}
302
303struct AudioFileDownloadStatus {
304 requested: RangeSet,
305 downloaded: RangeSet,
306}
307
308struct AudioFileShared {
309 cdn_url: CdnUrl,
310 file_size: usize,
311 bytes_per_second: usize,
312 cond: Condvar,
313 download_status: Mutex<AudioFileDownloadStatus>,
314 download_streaming: AtomicBool,
315 download_slots: Semaphore,
316 ping_time_ms: AtomicUsize,
317 read_position: AtomicUsize,
318 throughput: AtomicUsize,
319}
320
321impl AudioFileShared {
322 fn is_download_streaming(&self) -> bool {
323 self.download_streaming.load(Ordering::Acquire)
324 }
325
326 fn set_download_streaming(&self, streaming: bool) {
327 self.download_streaming.store(streaming, Ordering::Release)
328 }
329
330 fn ping_time(&self) -> Duration {
331 let ping_time_ms = self.ping_time_ms.load(Ordering::Acquire);
332 if ping_time_ms > 0 {
333 Duration::from_millis(ping_time_ms as u64)
334 } else {
335 AudioFetchParams::get().initial_ping_time_estimate
336 }
337 }
338
339 fn set_ping_time(&self, duration: Duration) {
340 self.ping_time_ms
341 .store(duration.as_millis() as usize, Ordering::Release)
342 }
343
344 fn throughput(&self) -> usize {
345 self.throughput.load(Ordering::Acquire)
346 }
347
348 fn set_throughput(&self, throughput: usize) {
349 self.throughput.store(throughput, Ordering::Release)
350 }
351
352 fn read_position(&self) -> usize {
353 self.read_position.load(Ordering::Acquire)
354 }
355
356 fn set_read_position(&self, position: u64) {
357 self.read_position
358 .store(position as usize, Ordering::Release)
359 }
360}
361
362impl AudioFile {
363 pub async fn open(
364 session: &Session,
365 file_id: FileId,
366 bytes_per_second: usize,
367 ) -> Result<AudioFile, Error> {
368 if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
369 debug!("File {} already in cache", file_id);
370 return Ok(AudioFile::Cached(file));
371 }
372
373 debug!("Downloading file {}", file_id);
374
375 let (complete_tx, complete_rx) = oneshot::channel();
376
377 let streaming =
378 AudioFileStreaming::open(session.clone(), file_id, complete_tx, bytes_per_second);
379
380 let session_ = session.clone();
381 session.spawn(complete_rx.map_ok(move |mut file| {
382 debug!("Downloading file {} complete", file_id);
383
384 if let Some(cache) = session_.cache() {
385 if let Some(cache_id) = cache.file_path(file_id) {
386 if let Err(e) = cache.save_file(file_id, &mut file) {
387 error!("Error caching file {} to {:?}: {}", file_id, cache_id, e);
388 } else {
389 debug!("File {} cached to {:?}", file_id, cache_id);
390 }
391 }
392 }
393 }));
394
395 Ok(AudioFile::Streaming(streaming.await?))
396 }
397
398 pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, Error> {
399 let controller = match self {
400 AudioFile::Streaming(ref stream) => StreamLoaderController {
401 channel_tx: Some(stream.stream_loader_command_tx.clone()),
402 stream_shared: Some(stream.shared.clone()),
403 file_size: stream.shared.file_size,
404 },
405 AudioFile::Cached(ref file) => StreamLoaderController {
406 channel_tx: None,
407 stream_shared: None,
408 file_size: file.metadata()?.len() as usize,
409 },
410 };
411
412 Ok(controller)
413 }
414
415 pub fn is_cached(&self) -> bool {
416 matches!(self, AudioFile::Cached { .. })
417 }
418}
419
420impl AudioFileStreaming {
421 pub async fn open(
422 session: Session,
423 file_id: FileId,
424 complete_tx: oneshot::Sender<NamedTempFile>,
425 bytes_per_second: usize,
426 ) -> Result<AudioFileStreaming, Error> {
427 let cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
428
429 if let Ok(url) = cdn_url.try_get_url() {
430 trace!("Streaming from {}", url);
431 }
432
433 let minimum_download_size = AudioFetchParams::get().minimum_download_size;
434
435 let mut streamer =
440 session
441 .spclient()
442 .stream_from_cdn(&cdn_url, 0, minimum_download_size)?;
443
444 let response = streamer.next().await.ok_or(AudioFileError::NoData)??;
448
449 let code = response.status();
450 if code != StatusCode::PARTIAL_CONTENT {
451 debug!(
452 "Opening audio file expected partial content but got: {}",
453 code
454 );
455 return Err(AudioFileError::StatusCode(code).into());
456 }
457
458 let header_value = response
459 .headers()
460 .get(CONTENT_RANGE)
461 .ok_or(AudioFileError::Header)?;
462 let str_value = header_value.to_str()?;
463 let hyphen_index = str_value.find('-').unwrap_or_default();
464 let slash_index = str_value.find('/').unwrap_or_default();
465 let upper_bound: usize = str_value[hyphen_index + 1..slash_index].parse()?;
466 let file_size = str_value[slash_index + 1..].parse()?;
467
468 let initial_request = StreamingRequest {
469 streamer,
470 initial_response: Some(response),
471 offset: 0,
472 length: upper_bound + 1,
473 };
474
475 let shared = Arc::new(AudioFileShared {
476 cdn_url,
477 file_size,
478 bytes_per_second,
479 cond: Condvar::new(),
480 download_status: Mutex::new(AudioFileDownloadStatus {
481 requested: RangeSet::new(),
482 downloaded: RangeSet::new(),
483 }),
484 download_streaming: AtomicBool::new(false),
485 download_slots: Semaphore::new(1),
486 ping_time_ms: AtomicUsize::new(0),
487 read_position: AtomicUsize::new(0),
488 throughput: AtomicUsize::new(0),
489 });
490
491 let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?;
492 write_file.as_file().set_len(file_size as u64)?;
493
494 let read_file = write_file.reopen()?;
495
496 let (stream_loader_command_tx, stream_loader_command_rx) =
497 mpsc::unbounded_channel::<StreamLoaderCommand>();
498
499 session.spawn(audio_file_fetch(
500 session.clone(),
501 shared.clone(),
502 initial_request,
503 write_file,
504 stream_loader_command_rx,
505 complete_tx,
506 ));
507
508 Ok(AudioFileStreaming {
509 read_file,
510 position: 0,
511 stream_loader_command_tx,
512 shared,
513 })
514 }
515}
516
517impl Read for AudioFileStreaming {
518 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
519 let offset = self.position as usize;
520
521 if offset >= self.shared.file_size {
522 return Ok(0);
523 }
524
525 let length = min(output.len(), self.shared.file_size - offset);
526 if length == 0 {
527 return Ok(0);
528 }
529
530 let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
531 let length_to_request = if self.shared.is_download_streaming() {
532 let length_to_request = length
533 + (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32)
534 as usize;
535
536 min(length_to_request, self.shared.file_size - offset)
538 } else {
539 length
540 };
541
542 let mut ranges_to_request = RangeSet::new();
543 ranges_to_request.add_range(&Range::new(offset, length_to_request));
544
545 let mut download_status = self.shared.download_status.lock();
546
547 ranges_to_request.subtract_range_set(&download_status.downloaded);
548 ranges_to_request.subtract_range_set(&download_status.requested);
549
550 for &range in ranges_to_request.iter() {
551 self.stream_loader_command_tx
552 .send(StreamLoaderCommand::Fetch(range))
553 .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
554 }
555
556 let download_timeout = AudioFetchParams::get().download_timeout;
557 while !download_status.downloaded.contains(offset) {
558 if self
559 .shared
560 .cond
561 .wait_for(&mut download_status, download_timeout)
562 .timed_out()
563 {
564 return Err(io::Error::new(
565 io::ErrorKind::TimedOut,
566 Error::deadline_exceeded(AudioFileError::WaitTimeout),
567 ));
568 }
569 }
570 let available_length = download_status
571 .downloaded
572 .contained_length_from_value(offset);
573
574 drop(download_status);
575
576 self.position = self.read_file.seek(SeekFrom::Start(offset as u64))?;
577 let read_len = min(length, available_length);
578 let read_len = self.read_file.read(&mut output[..read_len])?;
579
580 self.position += read_len as u64;
581 self.shared.set_read_position(self.position);
582
583 Ok(read_len)
584 }
585}
586
587impl Seek for AudioFileStreaming {
588 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
589 let current_position = self.position as i64;
592 let requested_pos = match pos {
593 SeekFrom::Start(pos) => pos as i64,
594 SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1,
595 SeekFrom::Current(pos) => current_position + pos,
596 };
597 if requested_pos == current_position {
598 return Ok(current_position as u64);
599 }
600
601 let available = self
603 .shared
604 .download_status
605 .lock()
606 .downloaded
607 .contains(requested_pos as usize);
608
609 let mut was_streaming = false;
610 if !available {
611 was_streaming = self.shared.is_download_streaming();
615 if was_streaming {
616 self.shared.set_download_streaming(false);
617 }
618 }
619
620 self.position = self.read_file.seek(pos)?;
621 self.shared.set_read_position(self.position);
622
623 if !available && was_streaming {
624 self.shared.set_download_streaming(true);
625 }
626
627 Ok(self.position)
628 }
629}
630
631impl Read for AudioFile {
632 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
633 match *self {
634 AudioFile::Cached(ref mut file) => file.read(output),
635 AudioFile::Streaming(ref mut file) => file.read(output),
636 }
637 }
638}
639
640impl Seek for AudioFile {
641 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
642 match *self {
643 AudioFile::Cached(ref mut file) => file.seek(pos),
644 AudioFile::Streaming(ref mut file) => file.seek(pos),
645 }
646 }
647}