use std::{
fmt,
num::NonZeroI64,
ops::Deref,
str::FromStr,
sync::{Arc, Mutex, PoisonError},
time::{Duration, SystemTime},
};
use rodio::SampleRate;
use stream_download::{
self, StreamDownload, StreamHandle, StreamPhase, StreamState, http::HttpStream,
source::SourceStream, storage::StorageProvider,
};
use time::OffsetDateTime;
use url::Url;
use veil::Redact;
use crate::{
audio_file::AudioFile,
error::{Error, Result},
http,
protocol::{
self, Codec,
connect::AudioQuality,
gateway::{self, LivestreamUrls},
media::{self, Cipher, CipherFormat, Data, Format, Medium},
},
util::ToF32,
};
pub const DEFAULT_SAMPLE_RATE: SampleRate = 44_100;
pub const DEFAULT_BITS_PER_SAMPLE: u32 = 16;
#[expect(clippy::module_name_repetitions)]
pub type TrackId = NonZeroI64;
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
#[expect(clippy::module_name_repetitions)]
pub enum TrackType {
#[default]
Song,
Episode,
Livestream,
}
impl TrackType {
const STEREO: u16 = 2;
const MONO: u16 = 1;
#[must_use]
pub fn default_channels(&self) -> u16 {
match self {
Self::Song | Self::Livestream => Self::STEREO,
Self::Episode => Self::MONO,
}
}
}
#[derive(Clone, Redact, Eq, PartialEq)]
#[redact(all, variant)]
pub enum ExternalUrl {
Direct(Url),
WithQuality(gateway::LivestreamUrls),
}
impl fmt::Display for TrackType {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Song => write!(f, "song"),
Self::Episode => write!(f, "episode"),
Self::Livestream => write!(f, "livestream"),
}
}
}
impl FromStr for TrackType {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
match s {
"song" => Ok(Self::Song),
"episode" => Ok(Self::Episode),
"livestream" => Ok(Self::Livestream),
_ => Err(Error::invalid_argument(format!("unknown track type: {s}"))),
}
}
}
#[derive(Debug)]
pub struct Track {
typ: TrackType,
id: TrackId,
token: Option<String>,
external: bool,
external_url: Option<ExternalUrl>,
title: Option<String>,
artist: String,
album_title: Option<String>,
cover_id: String,
gain: Option<f32>,
expiry: Option<SystemTime>,
quality: AudioQuality,
duration: Option<Duration>,
buffered: Arc<Mutex<Option<Duration>>>,
file_size: Option<u64>,
cipher: Cipher,
handle: Option<StreamHandle>,
available: bool,
bitrate: Option<usize>,
codec: Option<Codec>,
pub sample_rate: Option<SampleRate>,
pub bits_per_sample: Option<u32>,
pub channels: Option<u16>,
fallback: Option<Box<Self>>,
}
struct StreamUrl {
stream: HttpStream<reqwest::Client>,
url: reqwest::Url,
}
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub enum MediumType {
Primary(Medium),
Fallback(Medium),
}
impl Deref for MediumType {
type Target = Medium;
#[inline]
fn deref(&self) -> &Self::Target {
match self {
Self::Primary(medium) | Self::Fallback(medium) => medium,
}
}
}
impl Track {
pub const PREFETCH_DURATION: Duration = Duration::from_secs(3);
const PREFETCH_DEFAULT: usize = 60 * 1024;
#[must_use]
#[inline]
pub fn id(&self) -> TrackId {
self.id
}
#[must_use]
#[inline]
pub fn duration(&self) -> Option<Duration> {
self.duration
}
#[must_use]
#[inline]
pub fn available(&self) -> bool {
self.available
}
#[must_use]
#[inline]
pub fn typ(&self) -> TrackType {
self.typ
}
#[must_use]
#[inline]
pub fn gain(&self) -> Option<f32> {
self.gain
}
#[must_use]
#[inline]
pub fn title(&self) -> Option<&str> {
self.title.as_deref()
}
#[must_use]
#[inline]
pub fn artist(&self) -> &str {
&self.artist
}
#[must_use]
#[inline]
pub fn album_title(&self) -> Option<&str> {
self.album_title.as_deref()
}
#[must_use]
#[inline]
pub fn cover_id(&self) -> &str {
&self.cover_id
}
#[must_use]
#[inline]
pub fn expiry(&self) -> Option<SystemTime> {
self.expiry
}
#[must_use]
#[inline]
pub fn is_livestream(&self) -> bool {
self.typ == TrackType::Livestream
}
#[must_use]
pub fn buffered(&self) -> Option<Duration> {
*self.buffered.lock().unwrap_or_else(PoisonError::into_inner)
}
#[must_use]
#[inline]
pub fn quality(&self) -> AudioQuality {
self.quality
}
#[must_use]
#[inline]
pub fn cipher(&self) -> Cipher {
self.cipher
}
#[must_use]
#[inline]
pub fn is_encrypted(&self) -> bool {
self.cipher != Cipher::NONE
}
#[must_use]
#[inline]
pub fn is_lossless(&self) -> bool {
self.codec().is_some_and(|codec| codec == Codec::FLAC)
}
#[must_use]
#[inline]
pub fn is_podcast(&self) -> bool {
self.typ == TrackType::Episode
}
const BF_CBC_STRIPE_MP3_64: CipherFormat = CipherFormat {
cipher: Cipher::BF_CBC_STRIPE,
format: Format::MP3_64,
};
const BF_CBC_STRIPE_MP3_128: CipherFormat = CipherFormat {
cipher: Cipher::BF_CBC_STRIPE,
format: Format::MP3_128,
};
const BF_CBC_STRIPE_MP3_320: CipherFormat = CipherFormat {
cipher: Cipher::BF_CBC_STRIPE,
format: Format::MP3_320,
};
const BF_CBC_STRIPE_MP3_MISC: CipherFormat = CipherFormat {
cipher: Cipher::BF_CBC_STRIPE,
format: Format::MP3_MISC,
};
const BF_CBC_STRIPE_FLAC: CipherFormat = CipherFormat {
cipher: Cipher::BF_CBC_STRIPE,
format: Format::FLAC,
};
const CIPHER_FORMATS_MP3_64: [CipherFormat; 2] =
[Self::BF_CBC_STRIPE_MP3_64, Self::BF_CBC_STRIPE_MP3_MISC];
const CIPHER_FORMATS_MP3_128: [CipherFormat; 3] = [
Self::BF_CBC_STRIPE_MP3_128,
Self::BF_CBC_STRIPE_MP3_64,
Self::BF_CBC_STRIPE_MP3_MISC,
];
const CIPHER_FORMATS_MP3_320: [CipherFormat; 4] = [
Self::BF_CBC_STRIPE_MP3_320,
Self::BF_CBC_STRIPE_MP3_128,
Self::BF_CBC_STRIPE_MP3_64,
Self::BF_CBC_STRIPE_MP3_MISC,
];
const CIPHER_FORMATS_FLAC: [CipherFormat; 5] = [
Self::BF_CBC_STRIPE_FLAC,
Self::BF_CBC_STRIPE_MP3_320,
Self::BF_CBC_STRIPE_MP3_128,
Self::BF_CBC_STRIPE_MP3_64,
Self::BF_CBC_STRIPE_MP3_MISC,
];
const MEDIA_ENDPOINT: &'static str = "v1/get_url";
fn get_external_medium(&self, quality: AudioQuality) -> Result<MediumType> {
let external_url = self.external_url.as_ref().ok_or_else(|| {
Error::unavailable(format!("external {} {self} has no urls", self.typ))
})?;
let sources = match external_url {
ExternalUrl::Direct(url) => {
vec![media::Source {
url: url.clone(),
provider: String::default(),
}]
}
ExternalUrl::WithQuality(codec_urls) => {
let mut urls = Vec::new();
for (bitrate, codec_url) in codec_urls.sort_by_bitrate().into_iter().rev() {
if quality.bitrate().is_none_or(|kbps| bitrate <= kbps) {
if let Some(url) = codec_url.aac.or(codec_url.mp3) {
urls.push(media::Source {
url,
provider: String::default(),
});
}
}
}
urls
}
};
if sources.is_empty() {
return Err(Error::unavailable(format!(
"no valid sources found for external {} {self}",
self.typ
)));
}
let medium = Medium {
format: Format::EXTERNAL,
cipher: media::CipherType { typ: Cipher::NONE },
sources,
not_before: None,
expiry: None,
media_type: media::Type::FULL,
};
Ok(MediumType::Primary(medium))
}
pub async fn get_medium(
&self,
client: &http::Client,
media_url: &Url,
quality: AudioQuality,
license_token: impl Into<String>,
) -> Result<MediumType> {
if !self.available() {
return Err(Error::unavailable(format!(
"{} {self} is not available for download",
self.typ
)));
}
if let Some(expiry) = self.expiry {
if expiry <= SystemTime::now() {
return Err(Error::unavailable(format!(
"{} {self} has expired since {}",
self.typ,
OffsetDateTime::from(expiry)
)));
}
}
if self.external {
return self.get_external_medium(quality);
}
let track_token = self.token.as_ref().ok_or_else(|| {
Error::permission_denied(format!("{} {self} does not have a track token", self.typ))
})?;
let mut track_tokens = vec![track_token.to_owned()];
if let Some(fallback) = &self.fallback {
if let Some(fallback_token) = fallback.token.as_ref() {
track_tokens.push(fallback_token.to_owned());
}
}
let cipher_formats = match quality {
AudioQuality::Basic => Self::CIPHER_FORMATS_MP3_64.to_vec(),
AudioQuality::Standard => Self::CIPHER_FORMATS_MP3_128.to_vec(),
AudioQuality::High => Self::CIPHER_FORMATS_MP3_320.to_vec(),
AudioQuality::Lossless => Self::CIPHER_FORMATS_FLAC.to_vec(),
AudioQuality::Unknown => {
return Err(Error::unknown(format!(
"unknown audio quality for {} {self}",
self.typ
)));
}
};
let request = media::Request {
license_token: license_token.into(),
track_tokens,
media: vec![media::Media {
typ: media::Type::FULL,
cipher_formats,
}],
};
let get_url = media_url.join(Self::MEDIA_ENDPOINT)?;
let body = serde_json::to_string(&request)?;
let request = client.json(get_url, body);
let response = client.execute(request).await?;
let body = response.text().await?;
let items: media::Response = protocol::json(&body, Self::MEDIA_ENDPOINT)?;
let mut result = None;
for i in 0..items.data.len() {
if let Data::Media { media } = &items.data[i] {
if let Some(medium) = media.first().cloned() {
let medium_type = if i == 0 {
MediumType::Primary(medium)
} else {
MediumType::Fallback(medium)
};
result = Some(medium_type);
break;
}
}
}
let result = result
.ok_or_else(|| Error::not_found(format!("no media data for {} {self}", self.typ)))?;
let available_quality = AudioQuality::from(result.format);
if !self.is_user_uploaded() && !self.is_external() && quality != available_quality {
warn!(
"requested {} {self} in {}, but got {}",
self.typ, quality, available_quality
);
}
Ok(result)
}
#[must_use]
#[inline]
pub fn is_user_uploaded(&self) -> bool {
self.id.is_negative()
}
#[must_use]
#[inline]
pub fn is_deezer(&self) -> bool {
self.typ == TrackType::Song && !self.is_user_uploaded()
}
#[must_use]
#[inline]
pub fn is_cbr(&self) -> bool {
self.is_deezer() && !self.is_lossless()
}
async fn open_stream(&self, client: &http::Client, medium: &Medium) -> Result<StreamUrl> {
let now = SystemTime::now();
for source in &medium.sources {
let Some(host_str) = source.url.host_str() else {
warn!("skipping source with invalid host for {} {self}", self.typ);
continue;
};
if let Some(not_before) = medium.not_before {
if not_before > now {
warn!(
"{} {self} is not available for download until {} from {host_str}",
self.typ,
OffsetDateTime::from(not_before)
);
continue;
}
}
if let Some(expiry) = medium.expiry {
if expiry <= now {
warn!(
"{} {self} is no longer available for download since {} from {host_str}",
self.typ,
OffsetDateTime::from(expiry)
);
continue;
}
}
match HttpStream::new(client.unlimited.clone(), source.url.clone()).await {
Ok(stream) => {
debug!("starting download of {} {self} from {host_str}", self.typ);
return Ok(StreamUrl {
stream,
url: source.url.clone(),
});
}
Err(err) => {
warn!(
"failed to start download of {} {self} from {host_str}: {err}",
self.typ
);
}
}
}
Err(Error::unavailable(format!(
"no valid sources found for {} {self}",
self.typ
)))
}
fn init_download(&mut self, url: &Url) {
if let Some(ExternalUrl::WithQuality(urls)) = &self.external_url {
let result = find_codec_bitrate(urls, url);
self.codec = result.map(|some| some.0);
self.bitrate = result.map(|some| some.1);
} else {
if let Some(ExternalUrl::Direct(url)) = &self.external_url {
if let Some(extension) = url.path().split('.').next_back() {
if let Ok(codec) = extension.parse() {
self.codec = Some(codec);
}
}
} else if self.is_user_uploaded() {
self.codec = Some(Codec::MP3);
} else {
self.codec = self.quality.codec();
}
self.bitrate = match self.quality {
AudioQuality::Lossless | AudioQuality::Unknown => {
self.file_size
.unwrap_or_default()
.checked_div(self.duration.unwrap_or_default().as_secs())
.map(|bytes| {
let mut kbps = usize::try_from(bytes * 8 / 1000).unwrap_or(usize::MAX);
let max_bitrate = match self.codec() {
Some(Codec::ADTS | Codec::MP4) => 576,
Some(Codec::MP3) => 320,
Some(Codec::FLAC) => 1411,
Some(Codec::WAV) => 3072,
None => usize::MAX,
};
kbps = kbps.min(max_bitrate);
kbps
})
}
_ => self.quality.bitrate(),
};
}
}
pub async fn start_download<P>(
&mut self,
client: &http::Client,
medium: &MediumType,
storage: P,
) -> Result<AudioFile>
where
P: StorageProvider + Sync + 'static,
P::Reader: Sync,
{
let medium = match medium {
MediumType::Primary(medium) => medium,
MediumType::Fallback(medium) => {
if let Some(fallback) = &mut self.fallback {
warn!("falling back {} {} to {fallback}", self.typ, self.id);
std::mem::swap(&mut self.id, &mut fallback.id);
std::mem::swap(&mut self.artist, &mut fallback.artist);
std::mem::swap(&mut self.album_title, &mut fallback.album_title);
std::mem::swap(&mut self.cover_id, &mut fallback.cover_id);
std::mem::swap(&mut self.duration, &mut fallback.duration);
std::mem::swap(&mut self.title, &mut fallback.title);
std::mem::swap(&mut self.gain, &mut fallback.gain);
std::mem::swap(&mut self.token, &mut fallback.token);
std::mem::swap(&mut self.expiry, &mut fallback.expiry);
}
medium
}
};
let stream_url = self.open_stream(client, medium).await?;
let stream = stream_url.stream;
let url = stream_url.url;
self.quality = medium.format.into();
self.cipher = medium.cipher.typ;
if let Some(file_size) = stream.content_length() {
if file_size > 0 {
info!("downloading {file_size} bytes for {} {self}", self.typ);
self.file_size = Some(file_size);
} else {
return Err(Error::data_loss(format!("{} is 0 bytes", self.typ)));
}
} else {
info!("downloading {} {self} with unknown file size", self.typ);
}
self.init_download(&url);
let prefetch_size = self.prefetch_size().try_into()?;
trace!(
"prefetch size for {} {self}: {prefetch_size} bytes",
self.typ
);
let track_str = self.to_string();
let track_typ = self.typ.to_string();
let duration = self.duration;
let buffered = Arc::clone(&self.buffered);
let file_size = self.file_size;
let callback = move |_: &HttpStream<_>,
stream: StreamState,
_: &tokio_util::sync::CancellationToken| {
match stream.phase {
StreamPhase::Complete => {
info!("completed download of {track_typ} {track_str}");
*buffered.lock().unwrap() = duration;
}
StreamPhase::Downloading { .. } => {
if let Some(file_size) = file_size {
if file_size > 0 {
#[expect(clippy::cast_precision_loss)]
let progress = stream.current_position as f64 / file_size as f64;
*buffered.lock().unwrap() = duration.map(|duration| {
duration
.mul_f64(progress)
.saturating_sub(Self::PREFETCH_DURATION)
});
}
}
}
_ => {
}
}
};
let download = StreamDownload::from_stream(
stream,
storage,
stream_download::Settings::default()
.on_progress(callback)
.prefetch_bytes(prefetch_size)
.cancel_on_drop(true),
)
.await?;
self.handle = Some(download.handle());
AudioFile::try_from_download(self, download)
}
#[must_use]
#[inline]
pub fn handle(&self) -> Option<StreamHandle> {
self.handle.clone()
}
#[must_use]
#[inline]
pub fn is_complete(&self) -> bool {
self.buffered() >= self.duration
}
pub fn reset_download(&mut self) {
self.handle = None;
self.file_size = None;
*self.buffered.lock().unwrap() = None;
}
#[must_use]
#[inline]
pub fn file_size(&self) -> Option<u64> {
self.file_size
}
#[must_use]
#[inline]
pub fn is_external(&self) -> bool {
self.external
}
#[must_use]
#[inline]
pub fn bitrate(&self) -> Option<usize> {
self.bitrate
}
#[must_use]
#[inline]
pub fn codec(&self) -> Option<Codec> {
self.codec
}
#[must_use]
pub fn prefetch_size(&self) -> usize {
let mut prefetch_size = Self::PREFETCH_DEFAULT;
if let Some(kbps) = self.bitrate {
prefetch_size = (kbps * 1000 / 8)
* Self::PREFETCH_DURATION
.as_secs()
.try_into()
.unwrap_or(usize::MAX);
}
prefetch_size
}
}
impl From<gateway::ListData> for Track {
fn from(item: gateway::ListData) -> Self {
let (gain, album_title) = if let gateway::ListData::Song {
gain, album_title, ..
} = &item
{
(gain.as_ref(), Some(album_title))
} else {
(None, None)
};
let (available, external, external_url, fallback) = match &item {
gateway::ListData::Song { fallback, .. } => (true, false, None, fallback.clone()),
gateway::ListData::Episode {
available,
external,
external_url,
..
} => (
*available,
*external,
external_url.clone().map(ExternalUrl::Direct),
None,
),
gateway::ListData::Livestream {
available,
external_urls,
..
} => (
*available,
true,
Some(ExternalUrl::WithQuality(external_urls.clone())),
None,
),
};
let typ = item.typ().parse().unwrap_or_default();
Self {
typ,
id: item.id(),
token: item.token().map(ToOwned::to_owned),
title: item.title().map(ToOwned::to_owned),
artist: item.artist().to_owned(),
album_title: album_title.map(ToString::to_string),
cover_id: item.cover_id().to_owned(),
duration: item.duration(),
gain: gain.map(|gain| gain.to_f32_lossy()),
expiry: item.expiry(),
quality: AudioQuality::Unknown,
buffered: Arc::new(Mutex::new(None)),
file_size: None,
cipher: Cipher::BF_CBC_STRIPE,
handle: None,
available,
external,
external_url,
bitrate: None,
codec: None,
sample_rate: None,
bits_per_sample: None,
channels: None,
fallback: fallback.map(|boxed| Box::new((*boxed).into())),
}
}
}
impl fmt::Display for Track {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let artist = self.artist();
if let Some(title) = &self.title() {
write!(f, "{}: \"{} - {}\"", self.id, artist, title)
} else {
write!(f, "{}: \"{}\"", self.id, artist)
}
}
}
fn find_codec_bitrate(haystack: &LivestreamUrls, needle: &Url) -> Option<(Codec, usize)> {
for (kbps, codec) in &haystack.data {
if codec.aac.as_ref().is_some_and(|aac| aac == needle) {
return Some((Codec::ADTS, kbps.parse().ok()?));
} else if codec.mp3.as_ref().is_some_and(|mp3| mp3 == needle) {
return Some((Codec::MP3, kbps.parse().ok()?));
}
}
None
}