use std::{
collections::{HashMap, HashSet},
fmt::Write,
ops::ControlFlow,
pin::Pin,
time::Duration,
};
use futures_util::{SinkExt, StreamExt, stream::SplitSink};
use log::Level;
use rand::prelude::*;
use semver;
use time::OffsetDateTime;
use tokio::process::Command;
use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream,
tungstenite::{
Message as WebsocketMessage,
client::ClientRequestBuilder,
protocol::{WebSocketConfig, frame::Frame},
},
};
use uuid::Uuid;
use crate::{
config::{Config, Credentials},
error::{Error, Result},
events::Event,
gateway::Gateway,
player::Player,
protocol::connect::{
Body, Channel, Contents, DeviceId, DeviceType, Headers, Ident, Message, Percentage,
QueueItem, RepeatMode, Status, UserId,
queue::{self, MixType},
stream,
},
proxy,
tokens::UserToken,
track::{DEFAULT_BITS_PER_SAMPLE, DEFAULT_SAMPLE_RATE, Track, TrackId},
util::ToF32,
};
pub struct Client {
device_id: DeviceId,
device_name: String,
device_type: DeviceType,
credentials: Credentials,
gateway: Gateway,
user_token: Option<UserToken>,
time_to_live_tx: tokio::sync::mpsc::Sender<Duration>,
time_to_live_rx: tokio::sync::mpsc::Receiver<Duration>,
version: String,
websocket_tx:
Option<SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, WebsocketMessage>>,
subscriptions: HashSet<Ident>,
connection_state: ConnectionState,
watchdog_rx: Pin<Box<tokio::time::Sleep>>,
watchdog_tx: Pin<Box<tokio::time::Sleep>>,
discovery_state: DiscoveryState,
discovery_sessions: HashMap<DeviceId, String>,
event_rx: tokio::sync::mpsc::UnboundedReceiver<Event>,
event_tx: tokio::sync::mpsc::UnboundedSender<Event>,
initial_volume: InitialVolume,
interruptions: bool,
hook: Option<String>,
player: Player,
reporting_timer: Pin<Box<tokio::time::Sleep>>,
queue: Option<queue::List>,
deferred_position: Option<usize>,
eavesdrop: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum DiscoveryState {
Available,
Connecting {
controller: DeviceId,
ready_message_id: String,
},
Taken,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum ConnectionState {
Disconnected,
Connected {
controller: DeviceId,
session_id: Uuid,
},
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
enum ShuffleAction {
Shuffle,
Unshuffle,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum InitialVolume {
Active(Percentage),
Inactive(Percentage),
Disabled,
}
#[must_use]
#[inline]
fn from_now(seconds: Duration) -> Option<tokio::time::Instant> {
tokio::time::Instant::now().checked_add(seconds)
}
impl Client {
const NETWORK_TIMEOUT: Duration = Duration::from_secs(2);
const TOKEN_EXPIRATION_THRESHOLD: Duration = Duration::from_secs(60);
const REPORTING_INTERVAL: Duration = Duration::from_secs(3);
const WATCHDOG_RX_TIMEOUT: Duration = Duration::from_secs(10);
const WATCHDOG_TX_TIMEOUT: Duration = Duration::from_secs(5);
const FRAME_SIZE_MAX: usize = Self::MESSAGE_SIZE_MAX / 4;
const MESSAGE_SIZE_MAX: usize = Self::MESSAGE_BUFFER_MAX / 2;
const MESSAGE_BUFFER_MAX: usize = 2 * 128 * 1024;
const SESSION_DEFAULT_TTL: Duration = Duration::from_secs(4 * 3600);
const SESSION_COOKIE_NAME: &'static str = "bm_sz";
const JWT_DEFAULT_TTL: Duration = Duration::from_secs(30 * 24 * 3600);
const JWT_COOKIE_NAME: &'static str = "refresh-token";
const WEBSOCKET_URL: &'static str = "wss://live.deezer.com/ws/";
pub fn new(config: &Config, player: Player) -> Result<Self> {
let semver = semver::Version::parse(&config.app_version)?;
let major = semver.major;
let minor = semver.minor;
let patch = semver.patch;
let version = if major > 0 {
format!("{major}{minor:0>2}{patch:0>3}")
} else if minor > 0 {
format!("{minor}{patch:0>3}")
} else {
format!("{patch}")
};
trace!("remote version: {version}");
let reporting_timer = tokio::time::sleep(Duration::ZERO);
let watchdog_rx = tokio::time::sleep(Duration::ZERO);
let watchdog_tx = tokio::time::sleep(Duration::ZERO);
let (time_to_live_tx, time_to_live_rx) = tokio::sync::mpsc::channel(1);
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
let mut player = player;
player.register(event_tx.clone());
let initial_volume = match config.initial_volume {
Some(volume) => InitialVolume::Active(volume),
None => InitialVolume::Disabled,
};
Ok(Self {
device_id: config.device_id.into(),
device_name: config.device_name.clone(),
device_type: config.device_type,
credentials: config.credentials.clone(),
gateway: Gateway::new(config)?,
user_token: None,
time_to_live_tx,
time_to_live_rx,
version,
websocket_tx: None,
subscriptions: HashSet::new(),
connection_state: ConnectionState::Disconnected,
watchdog_rx: Box::pin(watchdog_rx),
watchdog_tx: Box::pin(watchdog_tx),
event_rx,
event_tx,
player,
reporting_timer: Box::pin(reporting_timer),
discovery_state: DiscoveryState::Available,
discovery_sessions: HashMap::new(),
initial_volume,
interruptions: config.interruptions,
hook: config.hook.clone(),
queue: None,
deferred_position: None,
eavesdrop: config.eavesdrop,
})
}
async fn user_token(&mut self) -> Result<(UserToken, Duration)> {
loop {
let token =
tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.user_token()).await??;
let time_to_live = token
.time_to_live()
.checked_sub(Self::TOKEN_EXPIRATION_THRESHOLD);
match time_to_live {
Some(duration) => {
debug!(
"user data time to live: {:.0}s",
duration.as_secs_f32().ceil(),
);
break Ok((token, duration));
}
None => {
self.gateway.flush_user_token();
}
}
}
}
fn set_player_settings(&mut self) {
let audio_quality = self.gateway.audio_quality();
info!("user casting quality: {audio_quality}");
self.player.set_audio_quality(audio_quality);
let gain_target_db = self.gateway.target_gain();
self.player.set_gain_target_db(gain_target_db);
if let Some(license_token) = self.gateway.license_token() {
self.player.set_license_token(license_token);
}
self.player.set_media_url(self.gateway.media_url());
}
fn cookie_str(&self) -> String {
let mut cookie_str = String::new();
if let Some(cookies) = self.gateway.cookies() {
for cookie in cookies.iter_unexpired() {
if !cookie_str.is_empty() {
cookie_str.push(';');
}
let (name, value) = cookie.name_value();
let _ = write!(cookie_str, "{name}={value}");
}
}
cookie_str
}
fn cookie_ttl(&self, name: &str) -> Option<Duration> {
let mut cookie_ttl = None;
if let Some(cookies) = self.gateway.cookies() {
for cookie in cookies.iter_any() {
if cookie.name() == name {
if let Some(max_age) = cookie.max_age().and_then(|ttl| ttl.try_into().ok()) {
cookie_ttl = Some(max_age);
} else if let Some(expires) = cookie.expires_datetime() {
let now = OffsetDateTime::now_utc();
if let Ok(ttl) = (expires - now).try_into() {
cookie_ttl = Some(ttl);
} else {
warn!("{name} expiry in the past: {expires}");
}
}
}
}
}
cookie_ttl
}
fn session_ttl(&self) -> Duration {
self.cookie_ttl(Self::SESSION_COOKIE_NAME)
.unwrap_or(Self::SESSION_DEFAULT_TTL)
.saturating_sub(Self::TOKEN_EXPIRATION_THRESHOLD)
}
fn jwt_ttl(&self) -> Duration {
self.cookie_ttl(Self::JWT_COOKIE_NAME)
.unwrap_or(Self::JWT_DEFAULT_TTL)
.saturating_sub(Self::TOKEN_EXPIRATION_THRESHOLD)
}
#[allow(clippy::too_many_lines)]
pub async fn start(&mut self) -> Result<()> {
self.discovery_sessions = HashMap::new();
let arl = match self.credentials.clone() {
Credentials::Login { email, password } => {
info!("logging in with email and password");
tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.oauth(&email, &password))
.await??
}
Credentials::Arl(arl) => {
info!("using ARL from secrets file");
arl
}
};
match tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.login_with_arl(&arl)).await {
Ok(inner) => {
if let Err(e) = inner {
warn!("jwt login failed: {e}");
} else {
debug!("jwt logged in");
}
}
Err(e) => warn!("jwt login timed out: {e}"),
}
let (user_token, token_ttl) = self.user_token().await?;
debug!("user id: {}", user_token.user_id);
let uri = format!(
"{}{}?version={}",
Self::WEBSOCKET_URL,
user_token,
self.version
);
let mut request = ClientRequestBuilder::new(uri.parse::<http::Uri>()?);
self.user_token = Some(user_token);
let cookie_str = self.cookie_str();
request = request.with_header(http::header::COOKIE.as_str(), cookie_str);
let token_expiry = tokio::time::sleep(token_ttl);
tokio::pin!(token_expiry);
let mut session_ttl = self.session_ttl();
debug!(
"session time to live: {:.0}s",
session_ttl.as_secs_f32().ceil()
);
let session_expiry = tokio::time::sleep(session_ttl);
tokio::pin!(session_expiry);
let mut jwt_ttl = self.jwt_ttl();
debug!("jwt time to live: {:.0}s", jwt_ttl.as_secs_f32().ceil());
let jwt_expiry = tokio::time::sleep(jwt_ttl);
tokio::pin!(jwt_expiry);
let config = Some(
WebSocketConfig::default()
.max_write_buffer_size(Self::MESSAGE_BUFFER_MAX)
.max_message_size(Some(Self::MESSAGE_SIZE_MAX))
.max_frame_size(Some(Self::FRAME_SIZE_MAX)),
);
let (ws_stream, _) = if let Some(proxy) = proxy::Http::from_env() {
info!("using proxy: {proxy}");
let tcp_stream = proxy.connect_async(&uri).await?;
tokio_tungstenite::client_async_tls_with_config(request, tcp_stream, config, None)
.await?
} else {
tokio_tungstenite::connect_async_with_config(request, config, false).await?
};
let (websocket_tx, mut websocket_rx) = ws_stream.split();
self.websocket_tx = Some(websocket_tx);
self.subscribe(Ident::Stream).await?;
self.subscribe(Ident::RemoteDiscover).await?;
if self.eavesdrop {
warn!("not discoverable: eavesdropping on websocket");
} else {
info!("ready for discovery");
}
let loop_result = loop {
tokio::select! {
biased;
() = &mut self.watchdog_tx, if self.is_connected() => {
if let Err(e) = self.send_ping().await {
error!("error sending ping: {e}");
}
}
() = &mut self.watchdog_rx, if self.is_connected() => {
error!("controller is not responding");
let _drop = self.disconnect().await;
}
() = &mut token_expiry => {
break Err(Error::deadline_exceeded("user token expired"));
}
() = &mut session_expiry => {
match tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.refresh()).await {
Ok(inner) => {
match inner {
Ok(()) => {
debug!("session renewed");
session_ttl = self.session_ttl();
}
Err(e) => {
error!("session renewal failed: {e}");
}
}
}
Err(e) => error!("session renewal timed out: {e}"),
}
debug!("session time to live: {:.0}s", session_ttl.as_secs_f32().ceil());
if let Some(deadline) = tokio::time::Instant::now().checked_add(session_ttl) {
session_expiry.as_mut().reset(deadline);
}
}
() = &mut jwt_expiry => {
match tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.renew_login()).await {
Ok(inner) => {
match inner {
Ok(()) => {
debug!("jwt renewed");
jwt_ttl = self.jwt_ttl();
}
Err(e) => {
warn!("jwt renewal failed: {e}");
}
}
}
Err(e) => warn!("jwt renewal timed out: {e}"),
}
debug!("jwt time to live: {:.0}s", jwt_ttl.as_secs_f32().ceil());
if let Some(deadline) = tokio::time::Instant::now().checked_add(jwt_ttl) {
jwt_expiry.as_mut().reset(deadline);
}
}
Some(token_ttl) = self.time_to_live_rx.recv() => {
if let Some(deadline) = tokio::time::Instant::now().checked_add(token_ttl) {
token_expiry.as_mut().reset(deadline);
}
}
() = &mut self.reporting_timer, if self.is_connected() => {
if let Err(e) = self.report_playback_progress().await {
error!("error reporting playback progress: {e}");
}
}
Some(message) = websocket_rx.next() => {
match message {
Ok(message) => {
let message_size = message.len();
if message_size > Self::MESSAGE_SIZE_MAX {
error!("ignoring oversized message with {message_size} bytes");
continue;
}
if let ControlFlow::Break(e) = self.handle_message(&message).await {
break Err(Error::internal(format!("error handling message: {e}")));
}
}
Err(e) => break Err(Error::cancelled(e.to_string())),
}
}
Err(e) = self.player.run(), if self.player.is_started() => {
error!("disconnecting due to audio stream error: {e}");
if let Err(e) = self.disconnect().await {
error!("error disconnecting: {e}");
break Err(e);
}
}
Some(event) = self.event_rx.recv() => {
self.handle_event(event).await;
}
}
};
self.stop().await;
loop_result
}
#[allow(clippy::too_many_lines)]
async fn handle_event(&mut self, event: Event) {
let mut command = self.hook.as_ref().map(Command::new);
let track_id = self.player.track().map(Track::id);
debug!("handling event: {event:?}");
if let Event::Pause | Event::Play = event {
let _ = self.report_playback_progress().await;
}
match event {
Event::Play => {
if let Some(track_id) = track_id {
if let Err(e) = self.report_playback(track_id).await {
error!("error streaming {track_id}: {e}");
}
if self.is_flow() {
if self
.queue
.as_ref()
.map_or(0, |queue| queue.tracks.len())
.saturating_sub(self.player.position())
<= 2
{
if let Err(e) = self.extend_queue().await {
error!("error extending queue: {e}");
}
}
}
if let Some(command) = command.as_mut() {
command
.env("EVENT", "playing")
.env("TRACK_ID", track_id.to_string());
}
}
}
Event::Pause => {
if let Some(command) = command.as_mut() {
command.env("EVENT", "paused");
}
}
Event::TrackChanged => {
if let Some(track) = self.player.track() {
if let Some(command) = command.as_mut() {
let codec = track.codec().map_or("Unknown".to_string(), |codec| {
codec.to_string().to_uppercase()
});
let bitrate = track.bitrate();
let bitrate = match bitrate {
Some(bitrate) => {
if bitrate >= 1000 {
format!(" {}M", bitrate.to_f32_lossy() / 1000.)
} else {
format!(" {bitrate}K")
}
}
None => String::default(),
};
let channels =
match track.channels.unwrap_or(track.typ().default_channels()) {
1 => "Mono".to_string(),
2 => "Stereo".to_string(),
3 => "2.1 Stereo".to_string(),
6 => "5.1 Surround Sound".to_string(),
other => format!("{other} channels"),
};
let decoded = format!(
"PCM {} bit {} kHz, {channels}",
track.bits_per_sample.unwrap_or(DEFAULT_BITS_PER_SAMPLE),
track
.sample_rate
.unwrap_or(DEFAULT_SAMPLE_RATE)
.to_f32_lossy()
/ 1000.0,
);
command
.env("EVENT", "track_changed")
.env("TRACK_TYPE", track.typ().to_string())
.env("TRACK_ID", track.id().to_string())
.env("ARTIST", track.artist())
.env("COVER_ID", track.cover_id())
.env("FORMAT", format!("{codec}{bitrate}"))
.env("DECODER", decoded);
if let Some(title) = track.title() {
command.env("TITLE", title);
}
if let Some(album_title) = track.album_title() {
command.env("ALBUM_TITLE", album_title);
}
if let Some(duration) = track.duration() {
command.env("DURATION", duration.as_secs().to_string());
}
}
}
}
Event::Connected => {
if let Some(command) = command.as_mut() {
command
.env("EVENT", "connected")
.env("USER_ID", self.user_id().to_string())
.env("USER_NAME", self.gateway.user_name().unwrap_or_default());
}
}
Event::Disconnected => {
if let Some(command) = command.as_mut() {
command.env("EVENT", "disconnected");
}
}
}
if let Some(command) = command.as_mut() {
match command.spawn() {
Ok(mut child) => match child.wait().await {
Ok(status) => {
if !status.success() {
error!(
"hook script exited with error {}",
status.code().unwrap_or(-1)
);
}
}
Err(e) => error!("failed to wait for hook script: {e}"),
},
Err(e) => error!("failed to spawn hook script: {e}"),
}
}
}
#[inline]
fn is_flow(&self) -> bool {
self.queue.as_ref().is_some_and(|queue| {
queue
.contexts
.first()
.unwrap_or_default()
.container
.mix
.typ
.enum_value_or_default()
== MixType::MIX_TYPE_USER
})
}
#[inline]
fn reset_watchdog_rx(&mut self) {
if let Some(deadline) = from_now(Self::WATCHDOG_RX_TIMEOUT) {
self.watchdog_rx.as_mut().reset(deadline);
}
}
#[inline]
fn reset_watchdog_tx(&mut self) {
if let Some(deadline) = from_now(Self::WATCHDOG_TX_TIMEOUT) {
self.watchdog_tx.as_mut().reset(deadline);
}
}
#[inline]
fn reset_reporting_timer(&mut self) {
if let Some(deadline) = from_now(Self::REPORTING_INTERVAL) {
self.reporting_timer.as_mut().reset(deadline);
}
}
pub async fn stop(&mut self) {
if self.is_connected() {
if let Err(e) = self.disconnect().await {
error!("error disconnecting: {e}");
}
}
while !self.event_rx.is_empty() {
if let Some(event) = self.event_rx.recv().await {
self.handle_event(event).await;
}
}
let subscriptions = self.subscriptions.clone();
for ident in subscriptions {
if self.unsubscribe(ident).await.is_ok() {
self.subscriptions.remove(&ident);
}
}
match tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.logout()).await {
Ok(inner) => {
if let Err(e) = inner {
warn!("jwt logout failed: {e}");
} else {
debug!("jwt logged out");
}
}
Err(e) => warn!("jwt logout timed out: {e}"),
}
}
fn message(&self, destination: DeviceId, channel: Channel, body: Body) -> Message {
let contents = Contents {
ident: channel.ident,
headers: Headers {
from: self.device_id.clone(),
destination: Some(destination),
},
body,
};
Message::Send { channel, contents }
}
fn command(&self, destination: DeviceId, body: Body) -> Message {
let remote_command = self.channel(Ident::RemoteCommand);
self.message(destination, remote_command, body)
}
fn discover(&self, destination: DeviceId, body: Body) -> Message {
let remote_discover = self.channel(Ident::RemoteDiscover);
self.message(destination, remote_discover, body)
}
async fn report_playback(&mut self, track_id: TrackId) -> Result<()> {
if let ConnectionState::Connected { session_id, .. } = &self.connection_state {
let message = Message::StreamSend {
channel: self.channel(Ident::Stream),
contents: stream::Contents {
action: stream::Action::Play,
ident: stream::Ident::Limitation,
value: stream::Value {
user: self.user_id(),
uuid: *session_id,
track_id,
},
},
};
self.send_message(message).await
} else {
Err(Error::failed_precondition(
"playback reporting should have an active connection".to_string(),
))
}
}
async fn disconnect(&mut self) -> Result<()> {
let result = self.send_close().await;
self.reset_states();
result
}
async fn handle_discovery_request(
&mut self,
from: DeviceId,
discovery_session_id: String,
) -> Result<()> {
if self
.discovery_sessions
.get(&from)
.is_none_or(|session_id| *session_id != discovery_session_id)
{
let offer = Body::ConnectionOffer {
message_id: Uuid::new_v4().to_string(),
from: self.device_id.clone(),
device_name: self.device_name.clone(),
device_type: self.device_type,
};
let discover = self.discover(from.clone(), offer);
self.send_message(discover).await?;
self.discovery_sessions.insert(from, discovery_session_id);
}
Ok(())
}
async fn handle_connect(&mut self, from: DeviceId, _offer_id: Option<String>) -> Result<()> {
if self.discovery_state == DiscoveryState::Taken {
debug!("not allowing interruptions from {from}");
return Ok(());
}
self.subscribe(Ident::RemoteQueue).await?;
if let Err(e) = self.subscribe(Ident::RemoteCommand).await {
let _drop = self.unsubscribe(Ident::RemoteQueue).await;
return Err(e);
}
let message_id = Uuid::new_v4().to_string();
let ready = Body::Ready {
message_id: message_id.clone(),
};
let command = self.command(from.clone(), ready);
self.send_message(command).await?;
self.discovery_state = DiscoveryState::Connecting {
controller: from,
ready_message_id: message_id,
};
Ok(())
}
#[must_use]
#[inline]
fn is_connected(&self) -> bool {
if let ConnectionState::Connected { .. } = &self.connection_state {
return true;
}
false
}
#[inline]
fn controller(&self) -> Option<DeviceId> {
if let ConnectionState::Connected { controller, .. } = &self.connection_state {
return Some(controller.clone());
}
if let DiscoveryState::Connecting { controller, .. } = &self.discovery_state {
return Some(controller.clone());
}
None
}
async fn send_close(&mut self) -> Result<()> {
if let Some(controller) = self.controller() {
let close = Body::Close {
message_id: Uuid::new_v4().to_string(),
};
let command = self.command(controller.clone(), close);
self.send_message(command).await?;
}
Ok(())
}
async fn handle_status(
&mut self,
from: DeviceId,
command_id: &str,
status: Status,
) -> Result<()> {
if status != Status::OK {
return Err(Error::failed_precondition(format!(
"controller failed to process {command_id}"
)));
}
if let DiscoveryState::Connecting {
controller,
ready_message_id,
} = self.discovery_state.clone()
{
if from == controller && command_id == ready_message_id {
if self.is_connected() {
self.send_close().await?;
}
if self.interruptions {
self.discovery_state = DiscoveryState::Available;
} else {
self.discovery_state = DiscoveryState::Taken;
}
self.connection_state = ConnectionState::Connected {
controller: from,
session_id: Uuid::new_v4(),
};
info!("connected to {controller}");
if let Err(e) = self.event_tx.send(Event::Connected) {
error!("failed to send connected event: {e}");
}
let (user_token, token_ttl) = match self.user_token().await {
Ok((token, ttl)) => (Ok(token), ttl),
Err(e) => (Err(e), Duration::ZERO),
};
if let Err(e) = self.time_to_live_tx.send(token_ttl).await {
error!("failed to send user token time to live: {e}");
}
self.user_token = Some(user_token?);
self.set_player_settings();
return Ok(());
}
return Err(Error::failed_precondition(
"should match controller and ready message".to_string(),
));
}
Ok(())
}
async fn handle_close(&mut self) -> Result<()> {
if self.controller().is_some() {
self.unsubscribe(Ident::RemoteQueue).await?;
self.unsubscribe(Ident::RemoteCommand).await?;
self.reset_states();
}
Ok(())
}
fn reset_states(&mut self) {
if let Some(controller) = self.controller() {
info!("disconnected from {controller}");
if let Err(e) = self.event_tx.send(Event::Disconnected) {
error!("failed to send disconnected event: {e}");
}
}
self.player.stop();
if let InitialVolume::Inactive(initial_volume) = self.initial_volume {
self.initial_volume = InitialVolume::Active(initial_volume);
}
self.gateway.flush_user_token();
self.connection_state = ConnectionState::Disconnected;
self.discovery_state = DiscoveryState::Available;
}
async fn handle_publish_queue(&mut self, list: queue::List) -> Result<()> {
let shuffled = if list.shuffled { "(shuffled)" } else { "" };
info!("setting queue to {} {shuffled}", list.id);
let queue = tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.list_to_queue(&list))
.await??;
let tracks: Vec<_> = queue.into_iter().map(Track::from).collect();
self.queue = Some(list);
self.player.set_queue(tracks);
if let Some(position) = self.deferred_position.take() {
self.set_position(position);
}
if self.is_flow() {
self.extend_queue().await?;
}
Ok(())
}
async fn send_ping(&mut self) -> Result<()> {
if let Some(controller) = self.controller() {
let ping = Body::Ping {
message_id: Uuid::new_v4().to_string(),
};
let command = self.command(controller.clone(), ping);
return self.send_message(command).await;
}
Err(Error::failed_precondition(
"ping should have an active connection".to_string(),
))
}
async fn extend_queue(&mut self) -> Result<()> {
let user_id = self.user_id();
if let Some(list) = self.queue.as_mut() {
let new_queue =
tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.user_radio(user_id))
.await??;
let new_tracks: Vec<_> = new_queue.into_iter().map(Track::from).collect();
let new_list: Vec<_> = new_tracks
.iter()
.map(|track| queue::Track {
id: track.id().to_string(),
..Default::default()
})
.collect();
debug!("extending queue with {} tracks", new_tracks.len());
list.tracks.extend(new_list);
self.player.extend_queue(new_tracks);
self.refresh_queue().await
} else {
Err(Error::failed_precondition(
"cannot extend queue: queue is missing",
))
}
}
async fn refresh_queue(&mut self) -> Result<()> {
if let Some(controller) = self.controller() {
if let Some(queue) = self.queue.as_mut() {
queue.id = Uuid::new_v4().to_string();
}
self.publish_queue().await?;
let contents = Body::RefreshQueue {
message_id: Uuid::new_v4().to_string(),
};
let channel = self.channel(Ident::RemoteQueue);
let refresh_queue = self.message(controller.clone(), channel, contents);
self.send_message(refresh_queue).await
} else {
Err(Error::failed_precondition(
"refresh should have an active connection".to_string(),
))
}
}
async fn handle_refresh_queue(&mut self) -> Result<()> {
if let Some(queue) = self.queue.as_mut() {
queue.id = Uuid::new_v4().to_string();
self.publish_queue().await?;
self.report_playback_progress().await
} else {
Err(Error::failed_precondition(
"queue refresh should have a published queue".to_string(),
))
}
}
async fn publish_queue(&mut self) -> Result<()> {
if let Some(controller) = self.controller() {
if let Some(queue) = self.queue.as_ref() {
let contents = Body::PublishQueue {
message_id: Uuid::new_v4().to_string(),
queue: queue.clone(),
};
let channel = self.channel(Ident::RemoteQueue);
let publish_queue = self.message(controller.clone(), channel, contents);
self.send_message(publish_queue).await
} else {
Err(Error::failed_precondition(
"queue refresh should have a published queue".to_string(),
))
}
} else {
Err(Error::failed_precondition(
"queue refresh should have an active connection".to_string(),
))
}
}
async fn send_acknowledgement(&mut self, acknowledgement_id: &str) -> Result<()> {
if let Some(controller) = self.controller() {
let acknowledgement = Body::Acknowledgement {
message_id: Uuid::new_v4().to_string(),
acknowledgement_id: acknowledgement_id.to_string(),
};
let command = self.command(controller, acknowledgement);
return self.send_message(command).await;
}
Err(Error::failed_precondition(
"acknowledgement should have an active connection".to_string(),
))
}
#[expect(clippy::too_many_arguments)]
async fn handle_skip(
&mut self,
message_id: &str,
queue_id: Option<&str>,
item: Option<QueueItem>,
progress: Option<Percentage>,
should_play: Option<bool>,
set_shuffle: Option<bool>,
set_repeat_mode: Option<RepeatMode>,
set_volume: Option<Percentage>,
) -> Result<()> {
if self.controller().is_some() {
self.send_acknowledgement(message_id).await?;
let refresh_queue = self.queue.as_ref().map(|queue| queue.shuffled) != set_shuffle;
if self
.set_player_state(
queue_id,
item,
progress,
should_play,
set_shuffle,
set_repeat_mode,
set_volume,
)
.is_err()
{
return self.disconnect().await;
}
if refresh_queue && self.queue.as_ref().map(|queue| queue.shuffled) == set_shuffle {
if let Err(e) = self.refresh_queue().await {
error!("error refreshing queue: {e}");
}
}
if let Err(e) = self.report_playback_progress().await {
error!("error reporting playback progress: {e}");
}
let status = if self.queue.is_some() {
Status::OK
} else {
Status::Error
};
self.send_status(message_id, status).await?;
Ok(())
} else {
Err(Error::failed_precondition(
"skip should have an active connection".to_string(),
))
}
}
#[inline]
fn set_position(&mut self, position: usize) {
let mut position = position;
if let Some(queue) = self.queue.as_ref() {
if queue.shuffled {
if let Some(ordered) = queue.tracks_order.get(position) {
position = *ordered as usize;
}
}
}
self.player.set_position(position);
}
#[expect(clippy::too_many_arguments)]
pub fn set_player_state(
&mut self,
queue_id: Option<&str>,
item: Option<QueueItem>,
progress: Option<Percentage>,
should_play: Option<bool>,
set_shuffle: Option<bool>,
set_repeat_mode: Option<RepeatMode>,
set_volume: Option<Percentage>,
) -> Result<()> {
let mut result = Ok(());
let current = self.player.position();
let mut target = current;
if let Some(item) = item {
target = item.position;
if self
.queue
.as_ref()
.is_some_and(|local| queue_id.is_some_and(|remote| local.id == remote))
{
self.set_position(target);
} else {
self.deferred_position = Some(target);
}
}
if target == current {
if let Some(progress) = progress {
if self
.player
.track()
.is_some_and(super::track::Track::is_livestream)
{
trace!("ignoring set_progress for livestream");
} else if let Err(e) = self.player.set_progress(progress) {
error!("error setting playback position: {e}");
result = Err(e);
}
}
}
if let Some(shuffle) = set_shuffle {
if self
.queue
.as_ref()
.is_some_and(|queue| queue.shuffled != shuffle)
{
if shuffle {
self.shuffle_queue(ShuffleAction::Shuffle);
} else {
self.shuffle_queue(ShuffleAction::Unshuffle);
}
if let Some(queue) = self.queue.as_mut() {
let reordered_queue: Vec<_> = queue
.tracks
.iter()
.filter_map(|track| track.id.parse().ok())
.collect();
self.player.reorder_queue(&reordered_queue);
}
}
}
if let Some(repeat_mode) = set_repeat_mode {
self.player.set_repeat_mode(repeat_mode);
}
if let Some(mut volume) = set_volume {
if let InitialVolume::Active(initial_volume) = self.initial_volume {
if volume < Percentage::ONE_HUNDRED {
self.initial_volume = InitialVolume::Inactive(initial_volume);
} else {
volume = initial_volume;
}
}
self.player.set_volume(volume);
}
if let Some(should_play) = should_play {
if should_play {
match self.player.start() {
Ok(()) => {
if let InitialVolume::Active(initial_volume) = self.initial_volume {
self.player.set_volume(initial_volume);
}
}
Err(e) => {
error!("error opening output device: {e}");
result = Err(e);
}
}
}
if let Err(e) = self.player.set_playing(should_play) {
error!("error setting playback state: {e}");
result = Err(e);
}
}
result
}
#[expect(clippy::cast_possible_truncation)]
fn shuffle_queue(&mut self, action: ShuffleAction) {
if let Some(queue) = self.queue.as_mut() {
match action {
ShuffleAction::Shuffle => {
info!("shuffling queue");
let len = queue.tracks.len();
let mut order: Vec<usize> = (0..len).collect();
order.shuffle(&mut rand::rng());
let mut tracks = Vec::with_capacity(len);
for i in &order {
tracks.push(queue.tracks[*i].clone());
}
queue.tracks = tracks;
queue.tracks_order = order.iter().map(|position| *position as u32).collect();
queue.shuffled = true;
}
ShuffleAction::Unshuffle => {
info!("unshuffling queue");
let len = queue.tracks.len();
let mut tracks = Vec::with_capacity(len);
for i in 0..len {
if let Some(position) = queue
.tracks_order
.iter()
.position(|position| *position == i as u32)
{
tracks.push(queue.tracks[position].clone());
}
}
queue.tracks = tracks;
queue.tracks_order = Vec::new();
queue.shuffled = false;
}
}
}
}
async fn send_status(&mut self, command_id: &str, status: Status) -> Result<()> {
if let Some(controller) = self.controller() {
let status = Body::Status {
message_id: Uuid::new_v4().to_string(),
command_id: command_id.to_string(),
status,
};
let command = self.command(controller.clone(), status);
return self.send_message(command).await;
}
Err(Error::failed_precondition(
"status should have an active connection".to_string(),
))
}
#[expect(clippy::cast_possible_truncation)]
async fn report_playback_progress(&mut self) -> Result<()> {
self.reset_reporting_timer();
if let Some(controller) = self.controller() {
if let Some(track) = self.player.track() {
let queue = self
.queue
.as_ref()
.ok_or_else(|| Error::internal("no active queue"))?;
let player_position = self.player.position();
let mut position = player_position;
let progress = self.player.progress();
if self.player.is_playing()
&& progress.is_some_and(|progress| progress >= Percentage::ONE_HUNDRED)
&& self.player.next_track().is_some()
{
return Ok(());
}
if queue.shuffled {
position = queue
.tracks_order
.iter()
.position(|i| *i == player_position as u32)
.unwrap_or_default();
}
let item = QueueItem {
queue_id: queue.id.to_string(),
track_id: track.id(),
position,
};
let progress = Body::PlaybackProgress {
message_id: Uuid::new_v4().to_string(),
track: item,
quality: track.quality(),
duration: self.player.duration(),
buffered: track.buffered(),
volume: self.player.volume(),
is_playing: self.player.is_playing(),
is_shuffle: queue.shuffled,
repeat_mode: self.player.repeat_mode(),
progress,
};
let command = self.command(controller.clone(), progress);
self.send_message(command).await?;
}
Ok(())
} else {
Err(Error::failed_precondition(
"playback progress should have an active connection".to_string(),
))
}
}
async fn handle_message(&mut self, message: &WebsocketMessage) -> ControlFlow<Error, ()> {
match message {
WebsocketMessage::Text(message) => {
match serde_json::from_str::<Message>(message.as_str()) {
Ok(message) => {
match message.clone() {
Message::Receive { contents, .. } => {
let from = contents.headers.from;
if from == self.device_id {
return ControlFlow::Continue(());
}
let for_another = contents
.headers
.destination
.is_some_and(|destination| destination != self.device_id);
if !for_another || self.eavesdrop {
if log_enabled!(Level::Trace) {
trace!("{message:#?}");
} else {
debug!("{message}");
}
}
if for_another || self.eavesdrop {
return ControlFlow::Continue(());
}
if self
.controller()
.is_some_and(|controller| controller == from)
{
self.reset_watchdog_rx();
}
if let Err(e) = self.dispatch(from, contents.body).await {
error!("error handling message: {e}");
}
}
Message::StreamReceive { contents, .. } => {
if self.eavesdrop {
if log_enabled!(Level::Trace) {
trace!("{message:#?}");
} else {
debug!("{message}");
}
}
if contents.action == stream::Action::Play {
let value = contents.value;
if value.user == self.user_id() {
if let ConnectionState::Connected { session_id, .. } =
self.connection_state
{
if value.uuid != session_id {
warn!(
"playback started on another device; disconnecting",
);
if let Err(e) = self.disconnect().await {
error!("error disconnecting: {e}");
return ControlFlow::Break(e);
}
}
}
}
}
return ControlFlow::Continue(());
}
_ => {
trace!("ignoring unexpected message: {message:#?}");
}
}
}
Err(e) => {
error!("error parsing message: {e}");
debug!("{message:#?}");
}
}
}
WebsocketMessage::Ping(payload) => {
debug!("ping -> pong");
let pong = Frame::pong(payload.clone());
if let Err(e) = self.send_frame(WebsocketMessage::Frame(pong)).await {
error!("{e}");
}
}
WebsocketMessage::Close(payload) => {
return ControlFlow::Break(Error::aborted(format!(
"connection closed by server: {payload:?}"
)));
}
_ => {
trace!("ignoring unimplemented frame: {message:#?}");
}
}
ControlFlow::Continue(())
}
async fn dispatch(&mut self, from: DeviceId, body: Body) -> Result<()> {
match body {
Body::Acknowledgement { .. } => Ok(()),
Body::Close { .. } => self.handle_close().await,
Body::Connect { from, offer_id, .. } => self.handle_connect(from, offer_id).await,
Body::DiscoveryRequest {
from,
discovery_session,
..
} => self.handle_discovery_request(from, discovery_session).await,
Body::Ping { message_id } => self.send_acknowledgement(&message_id).await,
Body::PublishQueue { queue, .. } => self.handle_publish_queue(queue).await,
Body::RefreshQueue { .. } => self.handle_refresh_queue().await,
Body::Skip {
message_id,
queue_id,
track,
progress,
should_play,
set_shuffle,
set_repeat_mode,
set_volume,
} => {
self.handle_skip(
&message_id,
queue_id.as_deref(),
track,
progress,
should_play,
set_shuffle,
set_repeat_mode,
set_volume,
)
.await
}
Body::Status {
command_id, status, ..
} => self.handle_status(from, &command_id, status).await,
Body::Stop { .. } => {
self.player.pause();
Ok(())
}
Body::ConnectionOffer { .. } | Body::PlaybackProgress { .. } | Body::Ready { .. } => {
trace!("ignoring message intended for a controller");
Ok(())
}
}
}
async fn send_frame(&mut self, frame: WebsocketMessage) -> Result<()> {
match &mut self.websocket_tx {
Some(tx) => tx.send(frame).await.map_err(Into::into),
None => Err(Error::unavailable(
"websocket stream unavailable".to_string(),
)),
}
}
async fn send_message(&mut self, message: Message) -> Result<()> {
self.reset_watchdog_tx();
if log_enabled!(Level::Trace) {
trace!("{message:#?}");
} else {
debug!("{message}");
}
let json = serde_json::to_string(&message)?;
let frame = WebsocketMessage::Text(json.into());
self.send_frame(frame).await
}
async fn subscribe(&mut self, ident: Ident) -> Result<()> {
if !self.subscriptions.contains(&ident) {
let channel = self.channel(ident);
let subscribe = Message::Subscribe { channel };
self.send_message(subscribe).await?;
self.subscriptions.insert(ident);
}
Ok(())
}
async fn unsubscribe(&mut self, ident: Ident) -> Result<()> {
if self.subscriptions.contains(&ident) {
let channel = self.channel(ident);
let unsubscribe = Message::Unsubscribe { channel };
self.send_message(unsubscribe).await?;
self.subscriptions.remove(&ident);
}
Ok(())
}
#[must_use]
#[inline]
fn user_id(&self) -> UserId {
self.user_token
.as_ref()
.map_or(UserId::Unspecified, |token| token.user_id)
}
#[must_use]
#[inline]
fn channel(&self, ident: Ident) -> Channel {
let user_id = self.user_id();
let from = if let Ident::UserFeed(_) = ident {
UserId::Unspecified
} else {
user_id
};
Channel {
from,
to: user_id,
ident,
}
}
}