use std::{collections::HashSet, sync::Arc, time::Duration};
use cpal::traits::{DeviceTrait, HostTrait};
use md5::{Digest, Md5};
use rodio::Source;
use stream_download::storage::{adaptive::AdaptiveStorageProvider, temp::TempStorageProvider};
use url::Url;
use crate::{
config::Config,
decoder::Decoder,
decrypt::{self},
error::{Error, ErrorKind, Result},
events::Event,
http, normalize,
protocol::{
connect::{
Percentage,
contents::{AudioQuality, RepeatMode},
},
gateway::{self, MediaUrl},
},
track::{Track, TrackId},
util::{self, ToF32, UNITY_GAIN},
};
pub type SampleFormat = f32;
pub struct Player {
audio_quality: AudioQuality,
license_token: String,
queue: Vec<Track>,
skip_tracks: HashSet<TrackId>,
position: usize,
deferred_seek: Option<Duration>,
client: http::Client,
repeat_mode: RepeatMode,
normalization: bool,
gain_target_db: i8,
volume: Percentage,
event_tx: Option<tokio::sync::mpsc::UnboundedSender<Event>>,
device: String,
sink: Option<rodio::Sink>,
stream: Option<rodio::OutputStream>,
sources: Option<Arc<rodio::queue::SourcesQueueInput<SampleFormat>>>,
playing_since: Duration,
current_rx: Option<std::sync::mpsc::Receiver<()>>,
preload_rx: Option<std::sync::mpsc::Receiver<()>>,
media_url: Url,
}
impl Player {
const DEFAULT_VOLUME: Percentage = Percentage::from_ratio(UNITY_GAIN);
const LOG_VOLUME_SCALE_FACTOR: f32 = 1000.0;
const LOG_VOLUME_GROWTH_RATE: f32 = 6.907_755_4;
const FADE_DURATION: Duration = Duration::from_millis(25);
pub async fn new(config: &Config, device: &str) -> Result<Self> {
let client = http::Client::without_cookies(config)?;
let bf_secret = if let Some(secret) = config.bf_secret {
secret
} else {
debug!("no bf_secret specified, fetching one from the web player");
Config::try_key(&client).await?
};
if format!("{:x}", Md5::digest(*bf_secret)) == Config::BF_SECRET_MD5 {
decrypt::set_bf_secret(bf_secret)?;
} else {
return Err(Error::permission_denied("the bf_secret is not valid"));
}
#[expect(clippy::cast_possible_truncation)]
let gain_target_db = gateway::user_data::Gain::default().target as i8;
Ok(Self {
queue: Vec::new(),
skip_tracks: HashSet::new(),
position: 0,
audio_quality: AudioQuality::default(),
client,
license_token: String::new(),
media_url: MediaUrl::default().into(),
repeat_mode: RepeatMode::default(),
normalization: config.normalization,
gain_target_db,
volume: Self::DEFAULT_VOLUME,
event_tx: None,
playing_since: Duration::ZERO,
deferred_seek: None,
current_rx: None,
preload_rx: None,
device: device.to_owned(),
sink: None,
stream: None,
sources: None,
})
}
fn get_device(device: &str) -> Result<(rodio::Device, rodio::SupportedStreamConfig)> {
let mut components = device.split('|');
let host = match components.next() {
Some("") | None => cpal::default_host(),
Some(name) => {
let host_ids = cpal::available_hosts();
host_ids
.into_iter()
.find_map(|host_id| {
let host = cpal::host_from_id(host_id).ok()?;
if host.id().name().eq_ignore_ascii_case(name) {
Some(host)
} else {
None
}
})
.ok_or_else(|| Error::not_found(format!("audio host {name} not found")))?
}
};
let device = match components.next() {
Some("") | None => host.default_output_device().ok_or_else(|| {
Error::not_found(format!(
"default audio output device not found on {}",
host.id().name()
))
})?,
Some(name) => {
let mut devices = host.output_devices()?;
devices
.find(|device| device.name().is_ok_and(|n| n.eq_ignore_ascii_case(name)))
.ok_or_else(|| {
Error::not_found(format!(
"audio output device {name} not found on {}",
host.id().name()
))
})?
}
};
let config = match components.next() {
Some("") | None => device.default_output_config().map_err(|e| {
Error::unavailable(format!("default output configuration unavailable: {e}"))
})?,
Some(rate) => {
let rate = rate
.parse()
.map_err(|_| Error::invalid_argument(format!("invalid sample rate {rate}")))?;
let rate = cpal::SampleRate(rate);
let format = match components.next() {
Some("") | None => None,
other => other,
};
device
.supported_output_configs()?
.find_map(|config| {
if format.is_none_or(|format| {
config
.sample_format()
.to_string()
.eq_ignore_ascii_case(format)
}) {
config.try_with_sample_rate(rate)
} else {
None
}
})
.ok_or_else(|| {
Error::unavailable(format!(
"audio output device {} does not support sample rate {} with {} sample format",
device.name().as_deref().unwrap_or("UNKNOWN"),
rate.0,
format.unwrap_or("default")
))
})?
}
};
info!(
"audio output device: {} on {}",
device.name().as_deref().unwrap_or("UNKNOWN"),
host.id().name()
);
#[expect(clippy::cast_precision_loss)]
let sample_rate = config.sample_rate().0 as f32 / 1000.0;
info!(
"audio output configuration: {sample_rate:.1} kHz in {}",
config.sample_format()
);
trace!("audio buffer size: {:#?}", config.buffer_size());
Ok((device, config))
}
pub fn start(&mut self) -> Result<()> {
if self.is_started() {
return Ok(());
}
debug!("opening output device");
let (device, device_config) = Self::get_device(&self.device)?;
let (stream, handle) = rodio::OutputStream::try_from_device_config(&device, device_config)?;
let sink = rodio::Sink::try_new(&handle)?;
let log_volume = Self::log_volume(self.volume.as_ratio());
sink.set_volume(log_volume);
let (sources, output) = rodio::queue::queue(true);
sink.append(output);
sink.pause();
self.sink = Some(sink);
self.sources = Some(sources);
self.stream = Some(stream);
Ok(())
}
pub fn stop(&mut self) {
if let Ok(sink) = self.sink_mut() {
debug!("closing output device");
sink.stop();
}
self.sources = None;
self.stream = None;
self.sink = None;
}
const SAMPLE_RATES: [u32; 2] = [44_100, 48_000];
const SAMPLE_FORMATS: [cpal::SampleFormat; 3] = [
cpal::SampleFormat::I16,
cpal::SampleFormat::I32,
cpal::SampleFormat::F32,
];
#[must_use]
pub fn enumerate_devices() -> Vec<String> {
let hosts = cpal::available_hosts();
let mut result = Vec::new();
for host in hosts
.into_iter()
.filter_map(|id| cpal::host_from_id(id).ok())
{
if let Ok(devices) = host.output_devices() {
for device in devices {
if let Ok(device_name) = device.name() {
if let Ok(configs) = device.supported_output_configs() {
for config in configs {
if config.channels() == 2
&& Self::SAMPLE_FORMATS.contains(&config.sample_format())
{
for sample_rate in &Self::SAMPLE_RATES {
if let Some(config) = config
.try_with_sample_rate(cpal::SampleRate(*sample_rate))
{
let line = format!(
"{}|{}|{}|{}",
host.id().name(),
device_name,
config.sample_rate().0,
config.sample_format(),
);
result.push(line);
}
}
}
}
}
}
}
}
}
result
}
fn go_next(&mut self) {
let old_position = self.position;
let repeat_mode = self.repeat_mode();
if repeat_mode != RepeatMode::One {
let next = self.position.saturating_add(1);
if next < self.queue.len() {
self.position = next;
} else {
self.set_position(0);
if repeat_mode != RepeatMode::All {
self.pause();
};
return;
}
}
if self.position() != old_position {
self.notify(Event::TrackChanged);
}
if self.is_playing() {
self.notify(Event::Play);
}
}
const NORMALIZE_ATTACK_TIME: Duration = Duration::from_millis(5);
const NORMALIZE_RELEASE_TIME: Duration = Duration::from_millis(100);
const NORMALIZE_THRESHOLD_DB: f32 = -1.0;
const NORMALIZE_KNEE_WIDTH_DB: f32 = 4.0;
const NETWORK_TIMEOUT: Duration = Duration::from_secs(2);
const REPLAY_GAIN_LUFS: i8 = -18;
async fn load_track(
&mut self,
position: usize,
) -> Result<Option<std::sync::mpsc::Receiver<()>>> {
let track = self
.queue
.get_mut(position)
.ok_or_else(|| Error::not_found(format!("track at position {position} not found")))?;
let sources = self
.sources
.as_mut()
.ok_or_else(|| Error::unavailable("audio sources not available"))?;
if track.handle().is_none() {
let download = tokio::time::timeout(Self::NETWORK_TIMEOUT, async {
let medium = track
.get_medium(
&self.client,
&self.media_url,
self.audio_quality,
self.license_token.clone(),
)
.await?;
let prefetch_size = usize::try_from(track.prefetch_size()).unwrap_or(usize::MAX);
let storage = AdaptiveStorageProvider::new(
TempStorageProvider::default(),
prefetch_size
.try_into()
.map_err(|e| Error::internal(format!("prefetch size error: {e}")))?,
);
track.start_download(&self.client, &medium, storage).await
})
.await??;
let mut decoder = Decoder::new(track, download)?;
track.sample_rate = Some(decoder.sample_rate());
track.channels = Some(decoder.channels());
if let Some(bits_per_sample) = decoder.bits_per_sample() {
track.bits_per_sample = Some(bits_per_sample);
}
if let Some(progress) = self.deferred_seek.take() {
if !progress.is_zero() {
if let Err(e) = decoder.try_seek(progress) {
error!("failed to seek to deferred position: {e}");
}
}
}
let mut difference = 0.0;
if self.normalization {
match track.gain() {
Some(gain) => difference = f32::from(self.gain_target_db) - gain,
None => {
if let Some(replay_gain) = decoder.replay_gain() {
debug!("track replay gain: {replay_gain:.1} dB");
let track_lufs = f32::from(Self::REPLAY_GAIN_LUFS) - replay_gain;
difference = f32::from(self.gain_target_db) - track_lufs;
} else {
warn!(
"{} {track} has no gain information, skipping normalization",
track.typ()
);
}
}
}
};
let rx = if difference == 0.0 {
sources.append_with_signal(decoder)
} else {
let ratio = util::db_to_ratio(difference);
debug!(
"normalizing {} {track} by {difference:.1} dB ({})",
track.typ(),
Percentage::from_ratio(ratio)
);
let normalized = normalize::normalize(
decoder,
ratio,
Self::NORMALIZE_THRESHOLD_DB,
Self::NORMALIZE_KNEE_WIDTH_DB,
Self::NORMALIZE_ATTACK_TIME,
Self::NORMALIZE_RELEASE_TIME,
);
sources.append_with_signal(normalized)
};
let sample_rate = track.sample_rate.map_or("unknown".to_string(), |rate| {
(rate.to_f32_lossy() / 1000.).to_string()
});
let codec = track
.codec()
.map_or("unknown".to_string(), |codec| codec.to_string());
let bitrate = track
.bitrate()
.map_or("unknown".to_string(), |kbps| kbps.to_string());
debug!(
"loaded {} {track}; codec: {codec}; sample rate: {sample_rate} kHz; bitrate: {bitrate} kbps; channels: {}",
track.typ(),
track
.channels
.unwrap_or_else(|| track.typ().default_channels())
);
return Ok(Some(rx));
}
Ok(None)
}
#[must_use]
fn get_pos(&self) -> Duration {
self.sink
.as_ref()
.map_or(Duration::ZERO, rodio::Sink::get_pos)
}
pub async fn run(&mut self) -> Result<()> {
const RUN_FREQUENCY: Duration = Duration::from_millis(10);
loop {
match self.current_rx.as_mut() {
Some(current_rx) => {
if current_rx.try_recv().is_ok() {
self.playing_since = self.get_pos();
self.current_rx = self.preload_rx.take();
if let Some(track) = self.track_mut() {
track.reset_download();
}
self.go_next();
} else if self.repeat_mode == RepeatMode::One {
if let Some(duration) = self.track().and_then(Track::duration) {
let remaining = duration.saturating_sub(self.get_pos());
if remaining <= RUN_FREQUENCY * 2 {
if self.set_progress(Percentage::ZERO).is_ok() {
self.notify(Event::Play);
} else {
self.clear();
}
}
}
} else if self.preload_rx.is_none()
&& self.is_playing()
&& self.repeat_mode() != RepeatMode::One
&& self.track().is_some_and(Track::is_complete)
{
let next_position = self.position.saturating_add(1);
if let Some(next_track) = self.queue.get(next_position) {
let next_track_id = next_track.id();
let next_track_typ = next_track.typ();
if !self.skip_tracks.contains(&next_track_id) {
match self.load_track(next_position).await {
Ok(rx) => {
self.preload_rx = rx;
}
Err(e) => {
error!("failed to preload next {next_track_typ}: {e}");
self.mark_unavailable(next_track_id);
}
}
}
}
}
}
None => {
if let Some(track) = self.track() {
let track_id = track.id();
let track_typ = track.typ();
if self.skip_tracks.contains(&track_id) {
self.go_next();
} else {
match self.load_track(self.position).await {
Ok(rx) => {
if let Some(rx) = rx {
self.current_rx = Some(rx);
self.notify(Event::TrackChanged);
if self.is_playing() {
self.notify(Event::Play);
}
}
}
Err(e) => {
error!("failed to load {track_typ}: {e}");
self.mark_unavailable(track_id);
}
}
}
}
}
}
tokio::time::sleep(RUN_FREQUENCY).await;
}
}
fn mark_unavailable(&mut self, track_id: TrackId) {
if self.skip_tracks.insert(track_id) {
warn!("marking track {track_id} as unavailable");
}
}
fn notify(&self, event: Event) {
if let Some(event_tx) = &self.event_tx {
if let Err(e) = event_tx.send(event) {
error!("failed to send event: {e}");
}
}
}
pub fn register(&mut self, event_tx: tokio::sync::mpsc::UnboundedSender<Event>) {
self.event_tx = Some(event_tx);
}
fn sink_mut(&mut self) -> Result<&mut rodio::Sink> {
self.sink
.as_mut()
.ok_or_else(|| Error::unavailable("audio sink not available"))
}
pub fn play(&mut self) -> Result<()> {
self.start()?;
if !self.is_playing() {
debug!("starting playback");
let pos = {
let sink_mut = self.sink_mut()?;
sink_mut.play();
sink_mut.get_pos()
};
if self.track().is_some_and(Track::is_livestream) {
self.playing_since = pos;
}
if self.is_loaded() {
self.notify(Event::Play);
}
}
Ok(())
}
#[must_use]
pub fn is_loaded(&self) -> bool {
self.current_rx.is_some()
}
pub fn pause(&mut self) {
debug!("pausing playback");
let _ = self.sink_mut().map(|sink| sink.pause());
self.notify(Event::Pause);
}
#[must_use]
pub fn is_playing(&self) -> bool {
self.current_rx.is_some() && self.sink.as_ref().is_some_and(|sink| !sink.is_paused())
}
pub fn set_playing(&mut self, should_play: bool) -> Result<()> {
if should_play {
self.play()
} else {
self.pause();
Ok(())
}
}
#[must_use]
#[inline]
pub fn track(&self) -> Option<&Track> {
self.queue.get(self.position)
}
#[must_use]
#[inline]
pub fn track_mut(&mut self) -> Option<&mut Track> {
self.queue.get_mut(self.position)
}
pub fn set_queue(&mut self, tracks: Vec<Track>) {
self.clear();
self.position = 0;
self.queue = tracks;
self.skip_tracks = HashSet::new();
}
#[must_use]
#[inline]
pub fn next_track(&self) -> Option<&Track> {
let next = self.position.saturating_add(1);
self.queue.get(next)
}
#[must_use]
#[inline]
pub fn next_track_mut(&mut self) -> Option<&mut Track> {
let next = self.position.saturating_add(1);
self.queue.get_mut(next)
}
pub fn reorder_queue(&mut self, track_ids: &[TrackId]) {
let current_track_id = self.track().map(Track::id);
let next_track_id = self.next_track().map(Track::id);
let mut new_queue = Vec::with_capacity(track_ids.len());
for new_track_id in track_ids {
if let Some(position) = self
.queue
.iter()
.position(|track| &track.id() == new_track_id)
{
let mut new_track = self.queue.remove(position);
if ![current_track_id, next_track_id].contains(&Some(new_track.id())) {
new_track.reset_download();
}
new_queue.push(new_track);
}
}
self.position = new_queue
.iter()
.position(|track| Some(track.id()) == current_track_id)
.unwrap_or_default();
self.queue = new_queue;
self.preload_rx = None;
self.sources.as_mut().map(|sources| sources.clear());
}
pub fn extend_queue(&mut self, tracks: Vec<Track>) {
self.queue.extend(tracks);
}
pub fn set_position(&mut self, target: usize) {
if self.position == target {
return;
}
info!("setting playlist position to {target}");
if target == self.position.saturating_add(1)
&& self.preload_rx.is_some()
&& self.is_playing()
{
match self.set_progress(Percentage::ONE_HUNDRED) {
Ok(()) => return,
Err(e) => warn!("failed to seek to end of current track: {e}"),
}
}
self.clear();
self.position = target;
}
pub fn clear(&mut self) {
let original_volume = self.ramp_volume(0.0);
if let Ok(sink) = self.sink_mut() {
sink.stop();
let _ = original_volume.inspect(|volume| sink.set_volume(*volume));
let (sources, output) = rodio::queue::queue(true);
sink.append(output);
self.sources = Some(sources);
}
if let Some(current) = self.track_mut() {
current.reset_download();
}
if let Some(next) = self.next_track_mut() {
next.reset_download();
}
self.playing_since = Duration::ZERO;
self.current_rx = None;
self.preload_rx = None;
}
#[must_use]
#[inline]
pub fn repeat_mode(&self) -> RepeatMode {
self.repeat_mode
}
pub fn set_repeat_mode(&mut self, repeat_mode: RepeatMode) {
info!("setting repeat mode to {repeat_mode}");
self.repeat_mode = repeat_mode;
if repeat_mode == RepeatMode::One {
self.sources.as_mut().map(|sources| sources.clear());
self.preload_rx = None;
}
}
#[must_use]
#[inline]
pub fn volume(&self) -> Percentage {
self.volume
}
#[must_use]
fn log_volume(volume: f32) -> f32 {
let mut amplitude = volume;
if amplitude > 0.0 && amplitude < UNITY_GAIN {
amplitude =
f32::exp(Self::LOG_VOLUME_GROWTH_RATE * volume) / Self::LOG_VOLUME_SCALE_FACTOR;
if volume < 0.1 {
amplitude *= volume * 10.0;
}
}
amplitude
}
pub fn set_volume(&mut self, target: Percentage) -> Result<Percentage> {
let current = self.volume;
if target == current {
return Ok(current);
}
info!("setting volume to {target}");
self.volume = target;
let volume = target.as_ratio().clamp(0.0, UNITY_GAIN);
let log_volume = Self::log_volume(volume);
if 2.0 * (volume - log_volume).abs() > f32::EPSILON * (volume.abs() + log_volume.abs()) {
debug!(
"volume scaled logarithmically to {}",
Percentage::from_ratio(log_volume)
);
}
if self.is_started() {
self.ramp_volume(log_volume).map(Percentage::from_ratio)
} else {
Ok(current)
}
}
fn ramp_volume(&mut self, target: f32) -> Result<f32> {
let sink_mut = self.sink_mut()?;
let original_volume = sink_mut.volume();
let millis = Self::FADE_DURATION.as_millis();
let fade_step = (target - original_volume) / millis.to_f32_lossy();
for i in 1..=millis {
let faded_volume = if i == millis {
target
} else {
original_volume + fade_step * i.to_f32_lossy()
};
sink_mut.set_volume(faded_volume);
std::thread::sleep(Duration::from_millis(1));
}
Ok(original_volume)
}
#[must_use]
pub fn progress(&self) -> Option<Percentage> {
self.track().and_then(|track| {
if track.is_livestream() {
Some(Percentage::ONE_HUNDRED)
} else {
if !self.is_loaded() {
return Some(Percentage::ZERO);
}
let duration = track.duration()?;
let progress = self.get_pos().saturating_sub(self.playing_since);
Some(Percentage::from_ratio(progress.div_duration_f32(duration)))
}
})
}
pub fn duration(&self) -> Option<Duration> {
self.track().and_then(|track| {
if track.is_livestream() {
self.sink
.as_ref()
.map(|sink| sink.get_pos().saturating_sub(self.playing_since))
} else {
track.duration()
}
})
}
pub fn set_progress(&mut self, progress: Percentage) -> Result<()> {
if let Some(track) = self.track() {
let duration = track.duration().ok_or_else(|| {
Error::unavailable(format!("duration unknown for {} {track}", track.typ()))
})?;
let ratio = progress.as_ratio();
let mut position = duration.mul_f32(ratio.clamp(0.0, 1.0));
let minutes = position.as_secs() / 60;
let seconds = position.as_secs() % 60;
info!(
"seeking {} {track} to {minutes:02}:{seconds:02} ({progress})",
track.typ()
);
if !track.is_complete() {
if let Some(buffered) = track.buffered() {
if position > buffered {
position = buffered;
}
let minutes = position.as_secs() / 60;
let seconds = position.as_secs() % 60;
warn!("limiting seek to {minutes:02}:{seconds:02} due to buffering");
}
}
match track
.handle()
.ok_or_else(|| {
Error::unavailable(format!(
"download of {} {track} not yet started",
track.typ()
))
})
.and_then(|_| {
self.sink_mut()
.and_then(|sink| sink.try_seek(position).map_err(Into::into))
}) {
Ok(()) => {
self.playing_since = Duration::ZERO;
self.deferred_seek = None;
}
Err(e) => {
if matches!(e.kind, ErrorKind::Unavailable | ErrorKind::Unimplemented) {
self.deferred_seek = Some(position);
} else {
return Err(e);
}
}
}
}
Ok(())
}
#[must_use]
#[inline]
pub fn position(&self) -> usize {
self.position
}
#[inline]
pub fn set_license_token(&mut self, license_token: impl Into<String>) {
self.license_token = license_token.into();
}
#[inline]
pub fn set_normalization(&mut self, normalization: bool) {
self.normalization = normalization;
}
pub fn set_gain_target_db(&mut self, gain_target_db: i8) {
if self.normalization {
info!("normalizing volume to {gain_target_db} dB");
}
self.gain_target_db = gain_target_db;
}
#[inline]
pub fn set_audio_quality(&mut self, quality: AudioQuality) {
self.audio_quality = quality;
}
#[must_use]
#[inline]
pub fn normalization(&self) -> bool {
self.normalization
}
#[must_use]
#[inline]
pub fn license_token(&self) -> &str {
&self.license_token
}
#[must_use]
#[inline]
pub fn audio_quality(&self) -> AudioQuality {
self.audio_quality
}
#[must_use]
#[inline]
pub fn gain_target_db(&self) -> i8 {
self.gain_target_db
}
#[inline]
pub fn set_media_url(&mut self, url: Url) {
self.media_url = url;
}
#[must_use]
#[inline]
pub fn is_started(&self) -> bool {
self.sink.is_some()
}
}
impl Drop for Player {
fn drop(&mut self) {
self.stop();
}
}