use std::collections::{HashMap, VecDeque};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::sync::{watch, Mutex};
use crate::client::MonoClient;
use crate::types::{MonoEvent, PlayStatus, QueuedTrack};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlayerState {
pub current_track: Option<QueuedTrack>,
pub position_secs: f32,
pub queue: Vec<QueuedTrack>,
pub history: Vec<QueuedTrack>,
pub volume: f32,
#[serde(default = "default_preamp")]
pub preamp: f32,
}
fn default_preamp() -> f32 {
1.0
}
impl PlayerState {
fn state_path() -> PathBuf {
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(".plexus/monochrome/player/state.json")
}
pub fn load() -> Option<Self> {
let path = Self::state_path();
let data = std::fs::read_to_string(&path).ok()?;
serde_json::from_str(&data).ok()
}
pub fn save(&self) {
let path = Self::state_path();
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
if let Ok(json) = serde_json::to_string_pretty(self) {
let _ = std::fs::write(&path, json);
}
}
}
trait ReadSeekSend: std::io::Read + std::io::Seek + Send + Sync {}
impl<T: std::io::Read + std::io::Seek + Send + Sync> ReadSeekSend for T {}
#[derive(Debug, Clone)]
pub struct NowPlaying {
pub track_id: Option<u64>,
pub title: Option<String>,
pub artist: Option<String>,
pub album: Option<String>,
pub status: PlayStatus,
pub position_secs: f32,
pub duration_secs: f32,
pub volume: f32,
pub preamp: f32,
pub queue_length: usize,
pub url: Option<String>,
}
impl Default for NowPlaying {
fn default() -> Self {
Self {
track_id: None,
title: None,
artist: None,
album: None,
status: PlayStatus::Idle,
position_secs: 0.0,
duration_secs: 0.0,
volume: 1.0,
preamp: 1.0,
queue_length: 0,
url: None,
}
}
}
struct PlayerInner {
queue: VecDeque<QueuedTrack>,
current_track: Option<QueuedTrack>,
status: PlayStatus,
volume: f32,
preamp: f32,
history: Vec<QueuedTrack>,
prefetched: HashMap<u64, Box<dyn ReadSeekSend>>,
}
pub struct Player {
sink: Arc<rodio::Sink>,
inner: Mutex<PlayerInner>,
now_playing_tx: watch::Sender<NowPlaying>,
now_playing_rx: watch::Receiver<NowPlaying>,
client: Arc<MonoClient>,
_shutdown_tx: std::sync::mpsc::Sender<()>,
}
impl Player {
pub async fn new(client: Arc<MonoClient>) -> Arc<Self> {
let (sink_tx, sink_rx) = std::sync::mpsc::channel();
let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel::<()>();
std::thread::spawn(move || {
let (_stream, handle) = rodio::OutputStream::try_default()
.expect("failed to open default audio output device");
let sink = rodio::Sink::try_new(&handle)
.expect("failed to create audio sink");
let _ = sink_tx.send(sink);
let _ = shutdown_rx.recv();
});
let sink = Arc::new(sink_rx.recv().expect("audio thread failed to initialize"));
sink.pause();
let (now_playing_tx, now_playing_rx) = watch::channel(NowPlaying::default());
let player = Arc::new(Self {
sink,
inner: Mutex::new(PlayerInner {
queue: VecDeque::new(),
current_track: None,
status: PlayStatus::Idle,
volume: 1.0,
preamp: 1.0,
history: Vec::new(),
prefetched: HashMap::new(),
}),
now_playing_tx,
now_playing_rx,
client,
_shutdown_tx: shutdown_tx,
});
let weak = Arc::downgrade(&player);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let Some(this) = weak.upgrade() else { break };
let is_playing = {
let inner = this.inner.lock().await;
matches!(inner.status, PlayStatus::Playing)
};
if is_playing {
this.broadcast_now_playing().await;
}
}
});
let weak = Arc::downgrade(&player);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(250)).await;
let Some(this) = weak.upgrade() else { break };
if !this.sink.empty() {
continue;
}
let mut inner = this.inner.lock().await;
if matches!(inner.status, PlayStatus::Playing) {
if let Some(current) = inner.current_track.take() {
inner.history.push(current);
}
if let Some(next) = inner.queue.pop_front() {
inner.status = PlayStatus::Buffering;
drop(inner);
if let Err(e) = this.start_playback(next).await {
tracing::error!("auto-advance failed: {e}");
let mut inner = this.inner.lock().await;
inner.status = PlayStatus::Idle;
inner.current_track = None;
drop(inner);
this.broadcast_now_playing().await;
}
this.save_state().await;
} else {
inner.status = PlayStatus::Idle;
inner.current_track = None;
drop(inner);
this.broadcast_now_playing().await;
this.save_state().await;
}
}
}
});
let weak = Arc::downgrade(&player);
tokio::spawn(async move {
let mut last_track_id: Option<u64> = None;
loop {
tokio::time::sleep(Duration::from_secs(2)).await;
let Some(this) = weak.upgrade() else { break };
let current_id = {
let inner = this.inner.lock().await;
if !matches!(inner.status, PlayStatus::Playing) {
continue;
}
inner.current_track.as_ref().map(|t| t.id)
};
if current_id != last_track_id {
last_track_id = current_id;
this.prefetch_queue().await;
}
}
});
player.setup_media_controls();
player.restore_state().await;
player
}
fn setup_media_controls(self: &Arc<Self>) {
use souvlaki::{
MediaControlEvent, MediaControls, MediaMetadata, MediaPlayback, MediaPosition,
PlatformConfig,
};
let tokio_handle = tokio::runtime::Handle::current();
let weak = Arc::downgrade(self);
let mut np_rx = self.subscribe_now_playing();
std::thread::Builder::new()
.name("media-controls".into())
.spawn(move || {
let config = PlatformConfig {
dbus_name: "plexus_mono",
display_name: "Plexus Mono",
hwnd: None,
};
let mut controls = match MediaControls::new(config) {
Ok(c) => c,
Err(e) => {
tracing::warn!("media controls unavailable: {e:?}");
return;
}
};
let weak2 = weak.clone();
let handle = tokio_handle.clone();
if let Err(e) = controls.attach(move |event: MediaControlEvent| {
let Some(player) = weak2.upgrade() else {
return;
};
let player = player.clone();
handle.spawn(async move {
match event {
MediaControlEvent::Play => player.resume().await,
MediaControlEvent::Pause => player.pause().await,
MediaControlEvent::Toggle => {
let is_playing = {
let inner = player.inner.lock().await;
matches!(inner.status, PlayStatus::Playing)
};
if is_playing {
player.pause().await;
} else {
player.resume().await;
}
}
MediaControlEvent::Next => {
let _ = player.next().await;
}
MediaControlEvent::Previous => {
let _ = player.previous().await;
}
_ => {}
}
});
}) {
tracing::warn!("failed to attach media controls: {e:?}");
return;
}
tracing::info!("media controls active (Now Playing + media keys)");
let _ = controls.set_metadata(MediaMetadata {
title: Some("Plexus Mono"),
artist: None,
album: None,
duration: None,
cover_url: None,
});
let _ = controls.set_playback(MediaPlayback::Paused { progress: None });
loop {
std::thread::sleep(Duration::from_millis(500));
if !np_rx.has_changed().unwrap_or(false) {
if weak.upgrade().is_none() {
break;
}
continue;
}
let np = np_rx.borrow_and_update().clone();
let cover_url = np.title.as_ref().and_then(|_| {
None::<String>
});
let _ = controls.set_metadata(MediaMetadata {
title: np.title.as_deref(),
artist: np.artist.as_deref(),
album: np.album.as_deref(),
duration: if np.duration_secs > 0.0 {
Some(Duration::from_secs_f32(np.duration_secs))
} else {
None
},
cover_url: cover_url.as_deref(),
});
let playback = match np.status {
PlayStatus::Playing => MediaPlayback::Playing {
progress: Some(MediaPosition(Duration::from_secs_f32(
np.position_secs,
))),
},
PlayStatus::Paused => MediaPlayback::Paused {
progress: Some(MediaPosition(Duration::from_secs_f32(
np.position_secs,
))),
},
_ => MediaPlayback::Stopped,
};
let _ = controls.set_playback(playback);
}
})
.expect("failed to spawn media-controls thread");
}
async fn broadcast_now_playing(&self) {
let inner = self.inner.lock().await;
let np = NowPlaying {
track_id: inner.current_track.as_ref().map(|t| t.id),
title: inner.current_track.as_ref().map(|t| t.title.clone()),
artist: inner.current_track.as_ref().map(|t| t.artist.clone()),
album: inner.current_track.as_ref().map(|t| t.album.clone()),
status: inner.status.clone(),
position_secs: self.sink.get_pos().as_secs_f32(),
duration_secs: inner
.current_track
.as_ref()
.map(|t| t.duration_secs as f32)
.unwrap_or(0.0),
volume: inner.volume,
preamp: inner.preamp,
queue_length: inner.queue.len(),
url: inner.current_track.as_ref().map(|t| format!("https://monochrome.tf/track/t/{}", t.id)),
};
let _ = self.now_playing_tx.send(np);
}
async fn start_playback(&self, track: QueuedTrack) -> Result<(), String> {
{
let mut inner = self.inner.lock().await;
inner.current_track = Some(track.clone());
inner.status = PlayStatus::Buffering;
}
self.broadcast_now_playing().await;
let prefetched: Option<Box<dyn ReadSeekSend>> = {
let mut inner = self.inner.lock().await;
inner.prefetched.remove(&track.id)
};
let reader: Box<dyn ReadSeekSend> = if let Some(r) = prefetched {
tracing::debug!("using prefetched audio for track {}", track.id);
r
} else {
let manifest = self.client.stream_manifest(track.id, &track.quality).await?;
let url = match &manifest {
MonoEvent::StreamManifest { url, .. } => url.clone(),
_ => return Err("unexpected manifest type".to_string()),
};
let r = stream_download::StreamDownload::new_http(
url.parse::<reqwest::Url>()
.map_err(|e| format!("bad stream url: {e}"))?,
stream_download::storage::temp::TempStorageProvider::new(),
stream_download::Settings::default(),
)
.await
.map_err(|e| format!("stream download error: {e}"))?;
Box::new(r)
};
let source = tokio::task::spawn_blocking(move || rodio::Decoder::new(reader))
.await
.map_err(|e| format!("decoder task panicked: {e}"))?
.map_err(|e| format!("audio decode error: {e}"))?;
self.sink.stop();
self.sink.append(source);
self.sink.play();
{
let mut inner = self.inner.lock().await;
inner.status = PlayStatus::Playing;
}
self.broadcast_now_playing().await;
Ok(())
}
pub async fn play_track(&self, id: u64, quality: &str) -> Result<(), String> {
let track_info = self.client.track_info(id).await.ok();
let queued = make_queued_track(id, quality, track_info);
{
let mut inner = self.inner.lock().await;
if let Some(current) = inner.current_track.take() {
inner.history.push(current);
}
}
self.start_playback(queued).await
}
pub async fn pause(&self) {
self.sink.pause();
let mut inner = self.inner.lock().await;
if matches!(inner.status, PlayStatus::Playing | PlayStatus::Buffering) {
inner.status = PlayStatus::Paused;
}
drop(inner);
self.broadcast_now_playing().await;
}
pub async fn resume(&self) {
self.sink.play();
let mut inner = self.inner.lock().await;
if matches!(inner.status, PlayStatus::Paused) {
inner.status = PlayStatus::Playing;
}
drop(inner);
self.broadcast_now_playing().await;
}
pub async fn stop(&self) {
self.sink.stop();
let mut inner = self.inner.lock().await;
if let Some(current) = inner.current_track.take() {
inner.history.push(current);
}
inner.status = PlayStatus::Stopped;
inner.prefetched.clear(); drop(inner);
self.broadcast_now_playing().await;
self.save_state().await;
}
pub async fn next(&self) -> Result<(), String> {
self.sink.stop();
let next = {
let mut inner = self.inner.lock().await;
if let Some(current) = inner.current_track.take() {
inner.history.push(current);
}
inner.queue.pop_front()
};
if let Some(track) = next {
self.start_playback(track).await
} else {
let mut inner = self.inner.lock().await;
inner.status = PlayStatus::Idle;
drop(inner);
self.broadcast_now_playing().await;
Err("queue is empty".to_string())
}
}
pub async fn previous(&self) -> Result<(), String> {
if self.sink.get_pos().as_secs_f32() > 5.0 {
let track = {
let inner = self.inner.lock().await;
inner.current_track.clone()
};
if let Some(track) = track {
return self.start_playback(track).await;
}
}
self.sink.stop();
let prev = {
let mut inner = self.inner.lock().await;
if let Some(current) = inner.current_track.take() {
inner.queue.push_front(current);
}
inner.history.pop()
};
if let Some(track) = prev {
self.start_playback(track).await
} else {
Err("no previous track".to_string())
}
}
fn apply_volume(&self, inner: &PlayerInner) {
self.sink.set_volume(inner.preamp * inner.volume);
}
pub async fn set_volume(&self, level: f32) {
let level = level.clamp(0.0, 1.0);
let mut inner = self.inner.lock().await;
inner.volume = level;
self.apply_volume(&inner);
drop(inner);
self.broadcast_now_playing().await;
self.save_state().await;
}
pub async fn set_preamp(&self, level: f32) {
let level = level.clamp(0.0, 4.0);
let mut inner = self.inner.lock().await;
inner.preamp = level;
self.apply_volume(&inner);
drop(inner);
self.broadcast_now_playing().await;
self.save_state().await;
}
pub async fn queue_add(&self, id: u64, quality: &str) -> Result<(), String> {
let track_info = self.client.track_info(id).await.ok();
let queued = make_queued_track(id, quality, track_info);
let should_start = {
let mut inner = self.inner.lock().await;
let idle = matches!(inner.status, PlayStatus::Idle | PlayStatus::Stopped);
if idle {
true
} else {
inner.queue.push_back(queued.clone());
false
}
};
let result = if should_start {
self.start_playback(queued).await
} else {
self.broadcast_now_playing().await;
Ok(())
};
self.save_state().await;
result
}
pub async fn queue_album(&self, album_id: u64, quality: &str) -> Result<Vec<QueuedTrack>, String> {
let (_album_event, track_events) = self.client.album(album_id).await?;
let mut queued_tracks = Vec::new();
for event in &track_events {
if let MonoEvent::AlbumTrack { id, title, artist, duration_secs, .. } = event {
queued_tracks.push(QueuedTrack {
id: *id,
title: title.clone(),
artist: artist.clone(),
album: String::new(), duration_secs: *duration_secs,
quality: quality.to_string(),
cover_id: None,
});
}
}
let album_name = if let MonoEvent::Album { title, cover_id, .. } = &_album_event {
for t in &mut queued_tracks {
t.album = title.clone();
t.cover_id = cover_id.clone();
}
title.clone()
} else {
format!("Album {album_id}")
};
if queued_tracks.is_empty() {
return Err(format!("no tracks found in album {album_name}"));
}
let should_start = {
let mut inner = self.inner.lock().await;
let idle = matches!(inner.status, PlayStatus::Idle | PlayStatus::Stopped);
if idle {
for t in queued_tracks.iter().skip(1) {
inner.queue.push_back(t.clone());
}
true
} else {
for t in &queued_tracks {
inner.queue.push_back(t.clone());
}
false
}
};
if should_start {
self.start_playback(queued_tracks[0].clone()).await?;
} else {
self.broadcast_now_playing().await;
}
Ok(queued_tracks)
}
pub async fn queue_batch(&self, ids: &[u64], quality: &str) -> Result<Vec<QueuedTrack>, String> {
if ids.is_empty() {
return Err("no track IDs provided".into());
}
let futs: Vec<_> = ids.iter().map(|&id| {
let client = self.client.clone();
let q = quality.to_string();
async move {
let info = client.track_info(id).await.ok();
make_queued_track(id, &q, info)
}
}).collect();
let tracks: Vec<QueuedTrack> = futures::future::join_all(futs).await;
let should_start = {
let mut inner = self.inner.lock().await;
let idle = matches!(inner.status, PlayStatus::Idle | PlayStatus::Stopped);
if idle {
for t in tracks.iter().skip(1) {
inner.queue.push_back(t.clone());
}
true
} else {
for t in &tracks {
inner.queue.push_back(t.clone());
}
false
}
};
if should_start {
self.start_playback(tracks[0].clone()).await?;
} else {
self.broadcast_now_playing().await;
}
self.save_state().await;
Ok(tracks)
}
pub async fn queue_clear(&self) {
let mut inner = self.inner.lock().await;
inner.queue.clear();
inner.prefetched.clear(); drop(inner);
self.broadcast_now_playing().await;
}
pub async fn queue_get(&self) -> (Option<QueuedTrack>, Vec<QueuedTrack>) {
let inner = self.inner.lock().await;
(
inner.current_track.clone(),
inner.queue.iter().cloned().collect(),
)
}
pub async fn queue_reorder(&self, from: usize, to: usize) -> Result<(), String> {
let mut inner = self.inner.lock().await;
if from >= inner.queue.len() || to >= inner.queue.len() {
return Err(format!(
"index out of bounds (queue has {} tracks)",
inner.queue.len()
));
}
let track = inner.queue.remove(from).unwrap();
inner.queue.insert(to, track);
Ok(())
}
async fn prefetch_queue(&self) {
let tracks: Vec<QueuedTrack> = {
let inner = self.inner.lock().await;
inner
.queue
.iter()
.filter(|t| !inner.prefetched.contains_key(&t.id))
.take(10)
.cloned()
.collect()
};
for track in tracks {
let manifest = match self.client.stream_manifest(track.id, &track.quality).await {
Ok(m) => m,
Err(e) => {
tracing::debug!("prefetch manifest failed for {}: {e}", track.id);
continue;
}
};
let url = match &manifest {
MonoEvent::StreamManifest { url, .. } => url.clone(),
_ => continue,
};
let parsed = match url.parse::<reqwest::Url>() {
Ok(u) => u,
Err(_) => continue,
};
let reader = match stream_download::StreamDownload::new_http(
parsed,
stream_download::storage::temp::TempStorageProvider::new(),
stream_download::Settings::default(),
)
.await
{
Ok(r) => r,
Err(e) => {
tracing::debug!("prefetch download failed for {}: {e}", track.id);
continue;
}
};
tracing::debug!("prefetched track {} ({})", track.id, track.title);
let mut inner = self.inner.lock().await;
inner.prefetched.insert(track.id, Box::new(reader));
}
}
pub fn subscribe_now_playing(&self) -> watch::Receiver<NowPlaying> {
self.now_playing_rx.clone()
}
pub async fn get_state(&self) -> PlayerState {
let inner = self.inner.lock().await;
PlayerState {
current_track: inner.current_track.clone(),
position_secs: self.sink.get_pos().as_secs_f32(),
queue: inner.queue.iter().cloned().collect(),
history: inner.history.clone(),
volume: inner.volume,
preamp: inner.preamp,
}
}
pub async fn save_state(&self) {
let state = self.get_state().await;
state.save();
}
pub async fn restore_state(&self) {
if let Some(state) = PlayerState::load() {
let resume_track = state.current_track.clone();
let resume_pos = state.position_secs;
{
let mut inner = self.inner.lock().await;
inner.queue = state.queue.into_iter().collect();
inner.history = state.history;
inner.volume = state.volume;
inner.preamp = state.preamp;
self.apply_volume(&inner);
}
if let Some(track) = resume_track {
tracing::info!(
"resuming '{}' at {:.0}s",
track.title,
resume_pos
);
match self.start_playback(track).await {
Ok(()) => {
self.sink.pause();
let mut inner = self.inner.lock().await;
inner.status = PlayStatus::Paused;
drop(inner);
self.broadcast_now_playing().await;
}
Err(e) => {
tracing::error!("failed to resume track: {e}");
}
}
} else {
self.broadcast_now_playing().await;
}
tracing::info!("restored player state from disk");
}
}
}
fn make_queued_track(id: u64, quality: &str, info: Option<MonoEvent>) -> QueuedTrack {
match info {
Some(MonoEvent::Track {
title,
artist,
album,
duration_secs,
cover_id,
..
}) => QueuedTrack {
id,
title,
artist,
album,
duration_secs,
quality: quality.to_string(),
cover_id,
},
_ => QueuedTrack {
id,
title: format!("Track {id}"),
artist: String::new(),
album: String::new(),
duration_secs: 0,
quality: quality.to_string(),
cover_id: None,
},
}
}