1mod receive;
2
3use std::{
4 cmp::min,
5 fs,
6 io::{self, Read, Seek, SeekFrom},
7 sync::{
8 Arc, OnceLock,
9 atomic::{AtomicBool, AtomicUsize, Ordering},
10 },
11 time::Duration,
12};
13
14use futures_util::{StreamExt, TryFutureExt, future::IntoStream};
15use hyper::{Response, StatusCode, body::Incoming, header::CONTENT_RANGE};
16use hyper_util::client::legacy::ResponseFuture;
17use parking_lot::{Condvar, Mutex};
18use tempfile::NamedTempFile;
19use thiserror::Error;
20use tokio::sync::{Semaphore, mpsc, oneshot};
21
22use librespot_core::{Error, FileId, Session, cdn_url::CdnUrl};
23
24use self::receive::audio_file_fetch;
25
26use crate::range_set::{Range, RangeSet};
27
28pub type AudioFileResult = Result<(), librespot_core::Error>;
29
30#[derive(Error, Debug)]
31pub enum AudioFileError {
32 #[error("other end of channel disconnected")]
33 Channel,
34 #[error("required header not found")]
35 Header,
36 #[error("streamer received no data")]
37 NoData,
38 #[error("no output available")]
39 Output,
40 #[error("invalid status code {0}")]
41 StatusCode(StatusCode),
42 #[error("wait timeout exceeded")]
43 WaitTimeout,
44}
45
46impl From<AudioFileError> for Error {
47 fn from(err: AudioFileError) -> Self {
48 match err {
49 AudioFileError::Channel => Error::aborted(err),
50 AudioFileError::Header => Error::unavailable(err),
51 AudioFileError::NoData => Error::unavailable(err),
52 AudioFileError::Output => Error::aborted(err),
53 AudioFileError::StatusCode(_) => Error::failed_precondition(err),
54 AudioFileError::WaitTimeout => Error::deadline_exceeded(err),
55 }
56 }
57}
58
59#[derive(Clone)]
60pub struct AudioFetchParams {
61 pub minimum_download_size: usize,
66
67 pub minimum_throughput: usize,
70
71 pub initial_ping_time_estimate: Duration,
73
74 pub maximum_assumed_ping_time: Duration,
77
78 pub read_ahead_before_playback: Duration,
82
83 pub read_ahead_during_playback: Duration,
88
89 pub prefetch_threshold_factor: f32,
93
94 pub download_timeout: Duration,
96}
97
98impl Default for AudioFetchParams {
99 fn default() -> Self {
100 let minimum_download_size = 64 * 1024;
101 let minimum_throughput = 8 * 1024;
102 Self {
103 minimum_download_size,
104 minimum_throughput,
105 initial_ping_time_estimate: Duration::from_millis(500),
106 maximum_assumed_ping_time: Duration::from_millis(1500),
107 read_ahead_before_playback: Duration::from_secs(1),
108 read_ahead_during_playback: Duration::from_secs(5),
109 prefetch_threshold_factor: 4.0,
110 download_timeout: Duration::from_secs(
111 (minimum_download_size / minimum_throughput) as u64,
112 ),
113 }
114 }
115}
116
117static AUDIO_FETCH_PARAMS: OnceLock<AudioFetchParams> = OnceLock::new();
118
119impl AudioFetchParams {
120 pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> {
121 AUDIO_FETCH_PARAMS.set(params)
122 }
123
124 pub fn get() -> &'static AudioFetchParams {
125 AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default)
126 }
127}
128
129pub enum AudioFile {
130 Cached(fs::File),
131 Streaming(AudioFileStreaming),
132}
133
134#[derive(Debug)]
135pub struct StreamingRequest {
136 streamer: IntoStream<ResponseFuture>,
137 initial_response: Option<Response<Incoming>>,
138 offset: usize,
139 length: usize,
140}
141
142#[derive(Debug)]
143pub enum StreamLoaderCommand {
144 Fetch(Range), Close, }
147
148#[derive(Clone)]
149pub struct StreamLoaderController {
150 channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
151 stream_shared: Option<Arc<AudioFileShared>>,
152 file_size: usize,
153}
154
155impl StreamLoaderController {
156 pub fn len(&self) -> usize {
157 self.file_size
158 }
159
160 pub fn is_empty(&self) -> bool {
161 self.file_size == 0
162 }
163
164 pub fn range_available(&self, range: Range) -> bool {
165 if let Some(ref shared) = self.stream_shared {
166 let download_status = shared.download_status.lock();
167
168 range.length
169 <= download_status
170 .downloaded
171 .contained_length_from_value(range.start)
172 } else {
173 range.length <= self.len() - range.start
174 }
175 }
176
177 pub fn range_to_end_available(&self) -> bool {
178 match self.stream_shared {
179 Some(ref shared) => {
180 let read_position = shared.read_position();
181 self.range_available(Range::new(read_position, self.len() - read_position))
182 }
183 None => true,
184 }
185 }
186
187 pub fn ping_time(&self) -> Option<Duration> {
188 self.stream_shared.as_ref().map(|shared| shared.ping_time())
189 }
190
191 fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
192 if let Some(ref channel) = self.channel_tx {
193 let _ = channel.send(command);
196 }
197 }
198
199 pub fn fetch(&self, range: Range) {
200 self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
202 }
203
204 pub fn fetch_blocking(&self, mut range: Range) -> AudioFileResult {
205 if range.start >= self.len() {
209 range.length = 0;
210 } else if range.end() > self.len() {
211 range.length = self.len() - range.start;
212 }
213
214 self.fetch(range);
215
216 if let Some(ref shared) = self.stream_shared {
217 let mut download_status = shared.download_status.lock();
218 let download_timeout = AudioFetchParams::get().download_timeout;
219
220 while range.length
221 > download_status
222 .downloaded
223 .contained_length_from_value(range.start)
224 {
225 if shared
226 .cond
227 .wait_for(&mut download_status, download_timeout)
228 .timed_out()
229 {
230 return Err(AudioFileError::WaitTimeout.into());
231 }
232
233 if range.length
234 > (download_status
235 .downloaded
236 .union(&download_status.requested)
237 .contained_length_from_value(range.start))
238 {
239 self.fetch(range);
242 }
243 }
244 }
245
246 Ok(())
247 }
248
249 pub fn fetch_next_and_wait(
250 &self,
251 request_length: usize,
252 wait_length: usize,
253 ) -> AudioFileResult {
254 match self.stream_shared {
255 Some(ref shared) => {
256 let start = shared.read_position();
257
258 let request_range = Range {
259 start,
260 length: request_length,
261 };
262 self.fetch(request_range);
263
264 let wait_range = Range {
265 start,
266 length: wait_length,
267 };
268 self.fetch_blocking(wait_range)
269 }
270 None => Ok(()),
271 }
272 }
273
274 pub fn set_random_access_mode(&self) {
275 if let Some(ref shared) = self.stream_shared {
277 shared.set_download_streaming(false)
278 }
279 }
280
281 pub fn set_stream_mode(&self) {
282 if let Some(ref shared) = self.stream_shared {
284 shared.set_download_streaming(true)
285 }
286 }
287
288 pub fn close(&self) {
289 self.send_stream_loader_command(StreamLoaderCommand::Close);
291 }
292}
293
294pub struct AudioFileStreaming {
295 read_file: fs::File,
296 position: u64,
297 stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
298 shared: Arc<AudioFileShared>,
299}
300
301struct AudioFileDownloadStatus {
302 requested: RangeSet,
303 downloaded: RangeSet,
304}
305
306struct AudioFileShared {
307 cdn_url: String,
308 file_size: usize,
309 bytes_per_second: usize,
310 cond: Condvar,
311 download_status: Mutex<AudioFileDownloadStatus>,
312 download_streaming: AtomicBool,
313 download_slots: Semaphore,
314 ping_time_ms: AtomicUsize,
315 read_position: AtomicUsize,
316 throughput: AtomicUsize,
317}
318
319impl AudioFileShared {
320 fn is_download_streaming(&self) -> bool {
321 self.download_streaming.load(Ordering::Acquire)
322 }
323
324 fn set_download_streaming(&self, streaming: bool) {
325 self.download_streaming.store(streaming, Ordering::Release)
326 }
327
328 fn ping_time(&self) -> Duration {
329 let ping_time_ms = self.ping_time_ms.load(Ordering::Acquire);
330 if ping_time_ms > 0 {
331 Duration::from_millis(ping_time_ms as u64)
332 } else {
333 AudioFetchParams::get().initial_ping_time_estimate
334 }
335 }
336
337 fn set_ping_time(&self, duration: Duration) {
338 self.ping_time_ms
339 .store(duration.as_millis() as usize, Ordering::Release)
340 }
341
342 fn throughput(&self) -> usize {
343 self.throughput.load(Ordering::Acquire)
344 }
345
346 fn set_throughput(&self, throughput: usize) {
347 self.throughput.store(throughput, Ordering::Release)
348 }
349
350 fn read_position(&self) -> usize {
351 self.read_position.load(Ordering::Acquire)
352 }
353
354 fn set_read_position(&self, position: u64) {
355 self.read_position
356 .store(position as usize, Ordering::Release)
357 }
358}
359
360impl AudioFile {
361 pub async fn open(
362 session: &Session,
363 file_id: FileId,
364 bytes_per_second: usize,
365 ) -> Result<AudioFile, Error> {
366 if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
367 debug!("File {file_id} already in cache");
368 return Ok(AudioFile::Cached(file));
369 }
370
371 debug!("Downloading file {file_id}");
372
373 let (complete_tx, complete_rx) = oneshot::channel();
374
375 let streaming =
376 AudioFileStreaming::open(session.clone(), file_id, complete_tx, bytes_per_second);
377
378 let session_ = session.clone();
379 session.spawn(complete_rx.map_ok(move |mut file| {
380 debug!("Downloading file {file_id} complete");
381
382 if let Some(cache) = session_.cache() {
383 if let Some(cache_id) = cache.file_path(file_id) {
384 if let Err(e) = cache.save_file(file_id, &mut file) {
385 error!("Error caching file {file_id} to {cache_id:?}: {e}");
386 } else {
387 debug!("File {file_id} cached to {cache_id:?}");
388 }
389 }
390 }
391 }));
392
393 Ok(AudioFile::Streaming(streaming.await?))
394 }
395
396 pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, Error> {
397 let controller = match self {
398 AudioFile::Streaming(stream) => StreamLoaderController {
399 channel_tx: Some(stream.stream_loader_command_tx.clone()),
400 stream_shared: Some(stream.shared.clone()),
401 file_size: stream.shared.file_size,
402 },
403 AudioFile::Cached(file) => StreamLoaderController {
404 channel_tx: None,
405 stream_shared: None,
406 file_size: file.metadata()?.len() as usize,
407 },
408 };
409
410 Ok(controller)
411 }
412
413 pub fn is_cached(&self) -> bool {
414 matches!(self, AudioFile::Cached { .. })
415 }
416}
417
418impl AudioFileStreaming {
419 pub async fn open(
420 session: Session,
421 file_id: FileId,
422 complete_tx: oneshot::Sender<NamedTempFile>,
423 bytes_per_second: usize,
424 ) -> Result<AudioFileStreaming, Error> {
425 let cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
426
427 let minimum_download_size = AudioFetchParams::get().minimum_download_size;
428
429 let mut response_streamer_url = None;
430 let urls = cdn_url.try_get_urls()?;
431 for url in &urls {
432 let mut streamer =
437 session
438 .spclient()
439 .stream_from_cdn(*url, 0, minimum_download_size)?;
440
441 let streamer_result = tokio::time::timeout(Duration::from_secs(10), streamer.next())
445 .await
446 .map_err(|_| AudioFileError::WaitTimeout.into())
447 .and_then(|x| x.ok_or_else(|| AudioFileError::NoData.into()))
448 .and_then(|x| x.map_err(Error::from));
449
450 match streamer_result {
451 Ok(r) => {
452 response_streamer_url = Some((r, streamer, url));
453 break;
454 }
455 Err(e) => warn!("Fetching {url} failed with error {e:?}, trying next"),
456 }
457 }
458
459 let Some((response, streamer, url)) = response_streamer_url else {
460 return Err(Error::unavailable(format!(
461 "{} URLs failed, none left to try",
462 urls.len()
463 )));
464 };
465
466 trace!("Streaming from {url}");
467
468 let code = response.status();
469 if code != StatusCode::PARTIAL_CONTENT {
470 debug!("Opening audio file expected partial content but got: {code}");
471 return Err(AudioFileError::StatusCode(code).into());
472 }
473
474 let header_value = response
475 .headers()
476 .get(CONTENT_RANGE)
477 .ok_or(AudioFileError::Header)?;
478 let str_value = header_value.to_str()?;
479 let hyphen_index = str_value.find('-').unwrap_or_default();
480 let slash_index = str_value.find('/').unwrap_or_default();
481 let upper_bound: usize = str_value[hyphen_index + 1..slash_index].parse()?;
482 let file_size = str_value[slash_index + 1..].parse()?;
483
484 let initial_request = StreamingRequest {
485 streamer,
486 initial_response: Some(response),
487 offset: 0,
488 length: upper_bound + 1,
489 };
490
491 let shared = Arc::new(AudioFileShared {
492 cdn_url: url.to_string(),
493 file_size,
494 bytes_per_second,
495 cond: Condvar::new(),
496 download_status: Mutex::new(AudioFileDownloadStatus {
497 requested: RangeSet::new(),
498 downloaded: RangeSet::new(),
499 }),
500 download_streaming: AtomicBool::new(false),
501 download_slots: Semaphore::new(1),
502 ping_time_ms: AtomicUsize::new(0),
503 read_position: AtomicUsize::new(0),
504 throughput: AtomicUsize::new(0),
505 });
506
507 let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?;
508 write_file.as_file().set_len(file_size as u64)?;
509
510 let read_file = write_file.reopen()?;
511
512 let (stream_loader_command_tx, stream_loader_command_rx) =
513 mpsc::unbounded_channel::<StreamLoaderCommand>();
514
515 session.spawn(audio_file_fetch(
516 session.clone(),
517 shared.clone(),
518 initial_request,
519 write_file,
520 stream_loader_command_rx,
521 complete_tx,
522 ));
523
524 Ok(AudioFileStreaming {
525 read_file,
526 position: 0,
527 stream_loader_command_tx,
528 shared,
529 })
530 }
531}
532
533impl Read for AudioFileStreaming {
534 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
535 let offset = self.position as usize;
536
537 if offset >= self.shared.file_size {
538 return Ok(0);
539 }
540
541 let length = min(output.len(), self.shared.file_size - offset);
542 if length == 0 {
543 return Ok(0);
544 }
545
546 let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
547 let length_to_request = if self.shared.is_download_streaming() {
548 let length_to_request = length
549 + (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32)
550 as usize;
551
552 min(length_to_request, self.shared.file_size - offset)
554 } else {
555 length
556 };
557
558 let mut ranges_to_request = RangeSet::new();
559 ranges_to_request.add_range(&Range::new(offset, length_to_request));
560
561 let mut download_status = self.shared.download_status.lock();
562
563 ranges_to_request.subtract_range_set(&download_status.downloaded);
564 ranges_to_request.subtract_range_set(&download_status.requested);
565
566 for &range in ranges_to_request.iter() {
567 self.stream_loader_command_tx
568 .send(StreamLoaderCommand::Fetch(range))
569 .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
570 }
571
572 let download_timeout = AudioFetchParams::get().download_timeout;
573 while !download_status.downloaded.contains(offset) {
574 if self
575 .shared
576 .cond
577 .wait_for(&mut download_status, download_timeout)
578 .timed_out()
579 {
580 return Err(io::Error::new(
581 io::ErrorKind::TimedOut,
582 Error::deadline_exceeded(AudioFileError::WaitTimeout),
583 ));
584 }
585 }
586 let available_length = download_status
587 .downloaded
588 .contained_length_from_value(offset);
589
590 drop(download_status);
591
592 self.position = self.read_file.seek(SeekFrom::Start(offset as u64))?;
593 let read_len = min(length, available_length);
594 let read_len = self.read_file.read(&mut output[..read_len])?;
595
596 self.position += read_len as u64;
597 self.shared.set_read_position(self.position);
598
599 Ok(read_len)
600 }
601}
602
603impl Seek for AudioFileStreaming {
604 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
605 let current_position = self.position as i64;
608 let requested_pos = match pos {
609 SeekFrom::Start(pos) => pos as i64,
610 SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1,
611 SeekFrom::Current(pos) => current_position + pos,
612 };
613 if requested_pos == current_position {
614 return Ok(current_position as u64);
615 }
616
617 let available = self
619 .shared
620 .download_status
621 .lock()
622 .downloaded
623 .contains(requested_pos as usize);
624
625 let mut was_streaming = false;
626 if !available {
627 was_streaming = self.shared.is_download_streaming();
631 if was_streaming {
632 self.shared.set_download_streaming(false);
633 }
634 }
635
636 self.position = self.read_file.seek(pos)?;
637 self.shared.set_read_position(self.position);
638
639 if !available && was_streaming {
640 self.shared.set_download_streaming(true);
641 }
642
643 Ok(self.position)
644 }
645}
646
647impl Read for AudioFile {
648 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
649 match *self {
650 AudioFile::Cached(ref mut file) => file.read(output),
651 AudioFile::Streaming(ref mut file) => file.read(output),
652 }
653 }
654}
655
656impl Seek for AudioFile {
657 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
658 match *self {
659 AudioFile::Cached(ref mut file) => file.seek(pos),
660 AudioFile::Streaming(ref mut file) => file.seek(pos),
661 }
662 }
663}