mod receive;
use std::{
cmp::min,
fs,
io::{self, Read, Seek, SeekFrom},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, OnceLock,
},
time::Duration,
};
use futures_util::{future::IntoStream, StreamExt, TryFutureExt};
use hyper::{body::Incoming, header::CONTENT_RANGE, Response, StatusCode};
use hyper_util::client::legacy::ResponseFuture;
use parking_lot::{Condvar, Mutex};
use tempfile::NamedTempFile;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, Semaphore};
use librespot_core::{cdn_url::CdnUrl, Error, FileId, Session};
use self::receive::audio_file_fetch;
use crate::range_set::{Range, RangeSet};
pub type AudioFileResult = Result<(), librespot_core::Error>;
#[derive(Error, Debug)]
pub enum AudioFileError {
#[error("other end of channel disconnected")]
Channel,
#[error("required header not found")]
Header,
#[error("streamer received no data")]
NoData,
#[error("no output available")]
Output,
#[error("invalid status code {0}")]
StatusCode(StatusCode),
#[error("wait timeout exceeded")]
WaitTimeout,
}
impl From<AudioFileError> for Error {
fn from(err: AudioFileError) -> Self {
match err {
AudioFileError::Channel => Error::aborted(err),
AudioFileError::Header => Error::unavailable(err),
AudioFileError::NoData => Error::unavailable(err),
AudioFileError::Output => Error::aborted(err),
AudioFileError::StatusCode(_) => Error::failed_precondition(err),
AudioFileError::WaitTimeout => Error::deadline_exceeded(err),
}
}
}
#[derive(Clone)]
pub struct AudioFetchParams {
pub minimum_download_size: usize,
pub minimum_throughput: usize,
pub initial_ping_time_estimate: Duration,
pub maximum_assumed_ping_time: Duration,
pub read_ahead_before_playback: Duration,
pub read_ahead_during_playback: Duration,
pub prefetch_threshold_factor: f32,
pub download_timeout: Duration,
}
impl Default for AudioFetchParams {
fn default() -> Self {
let minimum_download_size = 64 * 1024;
let minimum_throughput = 8 * 1024;
Self {
minimum_download_size,
minimum_throughput,
initial_ping_time_estimate: Duration::from_millis(500),
maximum_assumed_ping_time: Duration::from_millis(1500),
read_ahead_before_playback: Duration::from_secs(1),
read_ahead_during_playback: Duration::from_secs(5),
prefetch_threshold_factor: 4.0,
download_timeout: Duration::from_secs(
(minimum_download_size / minimum_throughput) as u64,
),
}
}
}
static AUDIO_FETCH_PARAMS: OnceLock<AudioFetchParams> = OnceLock::new();
impl AudioFetchParams {
pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> {
AUDIO_FETCH_PARAMS.set(params)
}
pub fn get() -> &'static AudioFetchParams {
AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default)
}
}
pub enum AudioFile {
Cached(fs::File),
Streaming(AudioFileStreaming),
}
#[derive(Debug)]
pub struct StreamingRequest {
streamer: IntoStream<ResponseFuture>,
initial_response: Option<Response<Incoming>>,
offset: usize,
length: usize,
}
#[derive(Debug)]
pub enum StreamLoaderCommand {
Fetch(Range), Close, }
#[derive(Clone)]
pub struct StreamLoaderController {
channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
stream_shared: Option<Arc<AudioFileShared>>,
file_size: usize,
}
impl StreamLoaderController {
pub fn len(&self) -> usize {
self.file_size
}
pub fn is_empty(&self) -> bool {
self.file_size == 0
}
pub fn range_available(&self, range: Range) -> bool {
let available = if let Some(ref shared) = self.stream_shared {
let download_status = shared.download_status.lock();
range.length
<= download_status
.downloaded
.contained_length_from_value(range.start)
} else {
range.length <= self.len() - range.start
};
available
}
pub fn range_to_end_available(&self) -> bool {
match self.stream_shared {
Some(ref shared) => {
let read_position = shared.read_position();
self.range_available(Range::new(read_position, self.len() - read_position))
}
None => true,
}
}
pub fn ping_time(&self) -> Option<Duration> {
self.stream_shared.as_ref().map(|shared| shared.ping_time())
}
fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
if let Some(ref channel) = self.channel_tx {
let _ = channel.send(command);
}
}
pub fn fetch(&self, range: Range) {
self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
}
pub fn fetch_blocking(&self, mut range: Range) -> AudioFileResult {
if range.start >= self.len() {
range.length = 0;
} else if range.end() > self.len() {
range.length = self.len() - range.start;
}
self.fetch(range);
if let Some(ref shared) = self.stream_shared {
let mut download_status = shared.download_status.lock();
let download_timeout = AudioFetchParams::get().download_timeout;
while range.length
> download_status
.downloaded
.contained_length_from_value(range.start)
{
if shared
.cond
.wait_for(&mut download_status, download_timeout)
.timed_out()
{
return Err(AudioFileError::WaitTimeout.into());
}
if range.length
> (download_status
.downloaded
.union(&download_status.requested)
.contained_length_from_value(range.start))
{
self.fetch(range);
}
}
}
Ok(())
}
pub fn fetch_next_and_wait(
&self,
request_length: usize,
wait_length: usize,
) -> AudioFileResult {
match self.stream_shared {
Some(ref shared) => {
let start = shared.read_position();
let request_range = Range {
start,
length: request_length,
};
self.fetch(request_range);
let wait_range = Range {
start,
length: wait_length,
};
self.fetch_blocking(wait_range)
}
None => Ok(()),
}
}
pub fn set_random_access_mode(&self) {
if let Some(ref shared) = self.stream_shared {
shared.set_download_streaming(false)
}
}
pub fn set_stream_mode(&self) {
if let Some(ref shared) = self.stream_shared {
shared.set_download_streaming(true)
}
}
pub fn close(&self) {
self.send_stream_loader_command(StreamLoaderCommand::Close);
}
}
pub struct AudioFileStreaming {
read_file: fs::File,
position: u64,
stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
shared: Arc<AudioFileShared>,
}
struct AudioFileDownloadStatus {
requested: RangeSet,
downloaded: RangeSet,
}
struct AudioFileShared {
cdn_url: CdnUrl,
file_size: usize,
bytes_per_second: usize,
cond: Condvar,
download_status: Mutex<AudioFileDownloadStatus>,
download_streaming: AtomicBool,
download_slots: Semaphore,
ping_time_ms: AtomicUsize,
read_position: AtomicUsize,
throughput: AtomicUsize,
}
impl AudioFileShared {
fn is_download_streaming(&self) -> bool {
self.download_streaming.load(Ordering::Acquire)
}
fn set_download_streaming(&self, streaming: bool) {
self.download_streaming.store(streaming, Ordering::Release)
}
fn ping_time(&self) -> Duration {
let ping_time_ms = self.ping_time_ms.load(Ordering::Acquire);
if ping_time_ms > 0 {
Duration::from_millis(ping_time_ms as u64)
} else {
AudioFetchParams::get().initial_ping_time_estimate
}
}
fn set_ping_time(&self, duration: Duration) {
self.ping_time_ms
.store(duration.as_millis() as usize, Ordering::Release)
}
fn throughput(&self) -> usize {
self.throughput.load(Ordering::Acquire)
}
fn set_throughput(&self, throughput: usize) {
self.throughput.store(throughput, Ordering::Release)
}
fn read_position(&self) -> usize {
self.read_position.load(Ordering::Acquire)
}
fn set_read_position(&self, position: u64) {
self.read_position
.store(position as usize, Ordering::Release)
}
}
impl AudioFile {
pub async fn open(
session: &Session,
file_id: FileId,
bytes_per_second: usize,
) -> Result<AudioFile, Error> {
if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
debug!("File {} already in cache", file_id);
return Ok(AudioFile::Cached(file));
}
debug!("Downloading file {}", file_id);
let (complete_tx, complete_rx) = oneshot::channel();
let streaming =
AudioFileStreaming::open(session.clone(), file_id, complete_tx, bytes_per_second);
let session_ = session.clone();
session.spawn(complete_rx.map_ok(move |mut file| {
debug!("Downloading file {} complete", file_id);
if let Some(cache) = session_.cache() {
if let Some(cache_id) = cache.file_path(file_id) {
if let Err(e) = cache.save_file(file_id, &mut file) {
error!("Error caching file {} to {:?}: {}", file_id, cache_id, e);
} else {
debug!("File {} cached to {:?}", file_id, cache_id);
}
}
}
}));
Ok(AudioFile::Streaming(streaming.await?))
}
pub fn get_stream_loader_controller(&self) -> Result<StreamLoaderController, Error> {
let controller = match self {
AudioFile::Streaming(ref stream) => StreamLoaderController {
channel_tx: Some(stream.stream_loader_command_tx.clone()),
stream_shared: Some(stream.shared.clone()),
file_size: stream.shared.file_size,
},
AudioFile::Cached(ref file) => StreamLoaderController {
channel_tx: None,
stream_shared: None,
file_size: file.metadata()?.len() as usize,
},
};
Ok(controller)
}
pub fn is_cached(&self) -> bool {
matches!(self, AudioFile::Cached { .. })
}
}
impl AudioFileStreaming {
pub async fn open(
session: Session,
file_id: FileId,
complete_tx: oneshot::Sender<NamedTempFile>,
bytes_per_second: usize,
) -> Result<AudioFileStreaming, Error> {
let cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
if let Ok(url) = cdn_url.try_get_url() {
trace!("Streaming from {}", url);
}
let minimum_download_size = AudioFetchParams::get().minimum_download_size;
let mut streamer =
session
.spclient()
.stream_from_cdn(&cdn_url, 0, minimum_download_size)?;
let response = streamer.next().await.ok_or(AudioFileError::NoData)??;
let code = response.status();
if code != StatusCode::PARTIAL_CONTENT {
debug!(
"Opening audio file expected partial content but got: {}",
code
);
return Err(AudioFileError::StatusCode(code).into());
}
let header_value = response
.headers()
.get(CONTENT_RANGE)
.ok_or(AudioFileError::Header)?;
let str_value = header_value.to_str()?;
let hyphen_index = str_value.find('-').unwrap_or_default();
let slash_index = str_value.find('/').unwrap_or_default();
let upper_bound: usize = str_value[hyphen_index + 1..slash_index].parse()?;
let file_size = str_value[slash_index + 1..].parse()?;
let initial_request = StreamingRequest {
streamer,
initial_response: Some(response),
offset: 0,
length: upper_bound + 1,
};
let shared = Arc::new(AudioFileShared {
cdn_url,
file_size,
bytes_per_second,
cond: Condvar::new(),
download_status: Mutex::new(AudioFileDownloadStatus {
requested: RangeSet::new(),
downloaded: RangeSet::new(),
}),
download_streaming: AtomicBool::new(false),
download_slots: Semaphore::new(1),
ping_time_ms: AtomicUsize::new(0),
read_position: AtomicUsize::new(0),
throughput: AtomicUsize::new(0),
});
let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?;
write_file.as_file().set_len(file_size as u64)?;
let read_file = write_file.reopen()?;
let (stream_loader_command_tx, stream_loader_command_rx) =
mpsc::unbounded_channel::<StreamLoaderCommand>();
session.spawn(audio_file_fetch(
session.clone(),
shared.clone(),
initial_request,
write_file,
stream_loader_command_rx,
complete_tx,
));
Ok(AudioFileStreaming {
read_file,
position: 0,
stream_loader_command_tx,
shared,
})
}
}
impl Read for AudioFileStreaming {
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
let offset = self.position as usize;
if offset >= self.shared.file_size {
return Ok(0);
}
let length = min(output.len(), self.shared.file_size - offset);
if length == 0 {
return Ok(0);
}
let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
let length_to_request = if self.shared.is_download_streaming() {
let length_to_request = length
+ (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32)
as usize;
min(length_to_request, self.shared.file_size - offset)
} else {
length
};
let mut ranges_to_request = RangeSet::new();
ranges_to_request.add_range(&Range::new(offset, length_to_request));
let mut download_status = self.shared.download_status.lock();
ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested);
for &range in ranges_to_request.iter() {
self.stream_loader_command_tx
.send(StreamLoaderCommand::Fetch(range))
.map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
}
let download_timeout = AudioFetchParams::get().download_timeout;
while !download_status.downloaded.contains(offset) {
if self
.shared
.cond
.wait_for(&mut download_status, download_timeout)
.timed_out()
{
return Err(io::Error::new(
io::ErrorKind::TimedOut,
Error::deadline_exceeded(AudioFileError::WaitTimeout),
));
}
}
let available_length = download_status
.downloaded
.contained_length_from_value(offset);
drop(download_status);
self.position = self.read_file.seek(SeekFrom::Start(offset as u64))?;
let read_len = min(length, available_length);
let read_len = self.read_file.read(&mut output[..read_len])?;
self.position += read_len as u64;
self.shared.set_read_position(self.position);
Ok(read_len)
}
}
impl Seek for AudioFileStreaming {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let current_position = self.position as i64;
let requested_pos = match pos {
SeekFrom::Start(pos) => pos as i64,
SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1,
SeekFrom::Current(pos) => current_position + pos,
};
if requested_pos == current_position {
return Ok(current_position as u64);
}
let available = self
.shared
.download_status
.lock()
.downloaded
.contains(requested_pos as usize);
let mut was_streaming = false;
if !available {
was_streaming = self.shared.is_download_streaming();
if was_streaming {
self.shared.set_download_streaming(false);
}
}
self.position = self.read_file.seek(pos)?;
self.shared.set_read_position(self.position);
if !available && was_streaming {
self.shared.set_download_streaming(true);
}
Ok(self.position)
}
}
impl Read for AudioFile {
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
match *self {
AudioFile::Cached(ref mut file) => file.read(output),
AudioFile::Streaming(ref mut file) => file.read(output),
}
}
}
impl Seek for AudioFile {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
match *self {
AudioFile::Cached(ref mut file) => file.seek(pos),
AudioFile::Streaming(ref mut file) => file.seek(pos),
}
}
}