use std::sync::mpsc::{Receiver, Sender, channel};
use blake3::Hash;
use selene_core::library::collection::Collectable;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use thiserror::Error;
use crate::{
LoopMode, ShuffleMode,
client::{CallbackFn, IpcHandleError, SeleneClient},
player::{PlaybackStatus, PlayerQueryFlags, QueryResult},
};
#[repr(u8)]
pub(crate) enum PacketType {
Unknown,
Event,
Response,
Error,
Disconnect,
}
#[derive(Debug, Error, Serialize, Deserialize, Clone, Copy)]
pub enum PacketError {
#[error("Packet size '{size}' too large: Max size is {max_size}")]
PacketTooLarge { size: usize, max_size: usize },
#[error("Client was disconnected while waiting for a packet")]
Disconnect,
}
impl From<u8> for PacketType {
fn from(value: u8) -> Self {
match value {
1 => Self::Event,
2 => Self::Response,
3 => Self::Error,
4 => Self::Disconnect,
_ => Self::Unknown,
}
}
}
impl SeleneClient {
pub fn ipc_generic(&self, command: IpcCommand) -> Result<(), PacketError> {
self.handle_tx.no_response(command)
}
pub fn ipc_flush(&self) -> Result<(), PacketError> {
self.handle_tx.request(IpcCommand::Flush)
}
pub fn ipc_disconnect(&self) {
self.handle_tx.action(IpcCommand::Disconnect);
}
pub fn ipc_reload_config(&self) -> Result<Result<(), String>, PacketError> {
self.handle_tx.request(IpcCommand::ReloadConfig)
}
pub fn ipc_play(&self, collectable: Vec<Collectable>) {
self.handle_tx.action(IpcCommand::Play {
collectables: collectable,
});
}
pub fn ipc_stop(&self) {
self.handle_tx.action(IpcCommand::Stop);
}
pub fn ipc_next(&self) {
self.handle_tx.action(IpcCommand::Next);
}
pub fn ipc_previous(&self) {
self.handle_tx.action(IpcCommand::Previous);
}
pub fn ipc_set_is_playing(&self, is_playing: bool) {
self.handle_tx
.action(IpcCommand::SetIsPlaying { is_playing });
}
pub fn ipc_toggle_is_playing(&self) -> Result<PlaybackStatus, PacketError> {
self.handle_tx.request(IpcCommand::TogglePlaying)
}
pub fn ipc_seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PacketError> {
self.handle_tx.request(IpcCommand::Seek {
seconds: time,
increment,
})
}
pub fn ipc_set_volume(&self, volume: f32, increment: bool) -> Result<f32, PacketError> {
self.handle_tx
.request(IpcCommand::SetVolume { volume, increment })
}
pub fn ipc_queue_set(
&self,
tracks: Vec<Collectable>,
expected_state: Hash,
) -> Result<bool, PacketError> {
self.handle_tx.request(IpcCommand::QueueSet {
tracks,
expected_state,
})
}
pub fn ipc_queue_extend(&self, tracks: Vec<Collectable>) {
self.handle_tx.action(IpcCommand::QueueExtend(tracks));
}
pub fn ipc_queue_shuffle(&self) {
self.handle_tx.action(IpcCommand::QueueShuffle);
}
pub fn ipc_queue_clear(&self) {
self.handle_tx.action(IpcCommand::QueueClear);
}
pub fn ipc_playlist_set(
&self,
collectables: Vec<Collectable>,
expected_state: Hash,
) -> Result<bool, PacketError> {
self.handle_tx.request(IpcCommand::PlaylistSet {
collectables,
expected_state,
})
}
pub fn ipc_playlist_extend(&self, collectables: Vec<Collectable>) {
self.handle_tx
.action(IpcCommand::PlaylistExtend(collectables));
}
pub fn ipc_playlist_clear(&self) {
self.handle_tx.action(IpcCommand::PlaylistClear);
}
pub fn ipc_playlist_set_shuffle_mode(&self, shuffle_mode: ShuffleMode) {
self.handle_tx
.action(IpcCommand::PlaylistSetShuffleMode { shuffle_mode });
}
pub fn ipc_playlist_set_loop_mode(&self, loop_mode: LoopMode) {
self.handle_tx
.action(IpcCommand::PlaylistSetLoopMode { loop_mode });
}
pub fn ipc_tracklist_seek(&self, index: isize, increment: bool) -> Result<usize, PacketError> {
self.handle_tx
.request(IpcCommand::TracklistSeek { index, increment })
}
pub fn ipc_query(&self, flags: PlayerQueryFlags) -> Result<QueryResult, PacketError> {
self.handle_tx.request(IpcCommand::GetState { flags })
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[non_exhaustive]
pub enum IpcCommand {
Flush,
Disconnect,
ReloadConfig,
Play {
collectables: Vec<Collectable>,
},
Stop,
SetIsPlaying {
is_playing: bool,
},
TogglePlaying,
Seek {
seconds: f64,
increment: bool,
},
SetVolume {
volume: f32,
increment: bool,
},
QueueSet {
tracks: Vec<Collectable>,
expected_state: Hash,
},
QueueExtend(Vec<Collectable>),
QueueShuffle,
QueueClear,
PlaylistSet {
collectables: Vec<Collectable>,
expected_state: Hash,
},
PlaylistExtend(Vec<Collectable>),
PlaylistClear,
PlaylistSetShuffleMode {
shuffle_mode: ShuffleMode,
},
PlaylistSetLoopMode {
loop_mode: LoopMode,
},
TracklistSeek {
index: isize,
increment: bool,
},
Next,
Previous,
GetState {
flags: PlayerQueryFlags,
},
}
impl IpcCommand {
#[must_use]
pub fn responds(&self) -> bool {
matches!(
self,
IpcCommand::Flush
| IpcCommand::TogglePlaying
| IpcCommand::Seek { .. }
| IpcCommand::SetVolume { .. }
| IpcCommand::QueueSet { .. }
| IpcCommand::PlaylistSet { .. }
| IpcCommand::TracklistSeek { .. }
| IpcCommand::GetState { .. }
)
}
}
pub enum IpcRequest {
Ipc {
command: IpcCommand,
callback: Option<CallbackFn>,
},
Reconnect {
callback: Sender<Result<(), IpcHandleError>>,
},
}
pub trait IpcTx {
fn no_response(&self, command: IpcCommand) -> Result<(), PacketError>;
fn request<T: DeserializeOwned + Send + 'static>(
&self,
command: IpcCommand,
) -> Result<T, PacketError>;
fn action(&self, command: IpcCommand);
fn disconnect(&self);
fn reconnect(&self) -> Result<(), IpcHandleError>;
}
impl IpcTx for Sender<IpcRequest> {
fn no_response(&self, command: IpcCommand) -> Result<(), PacketError> {
self.send(IpcRequest::Ipc {
command,
callback: None,
})
.unwrap();
Ok(())
}
fn request<T: DeserializeOwned + Send + 'static>(
&self,
command: IpcCommand,
) -> Result<T, PacketError> {
assert!(
command.responds(),
"Commands must always respond with request()"
);
let (tx, rx) = channel();
let callback: CallbackFn = Box::new(move |result| {
let _ = tx.send(result.map(|bytes| {
ciborium::from_reader::<T, &[u8]>(bytes).expect("Daemon sent invalid bytes")
}));
});
self.send(IpcRequest::Ipc {
command,
callback: Some(Box::new(callback)),
})
.unwrap();
rx.recv().unwrap()
}
fn action(&self, command: IpcCommand) {
assert!(
!command.responds(),
"Commands must never respond with action()"
);
self.send(IpcRequest::Ipc {
command,
callback: None,
})
.unwrap();
}
fn disconnect(&self) {
let _ = self.send(IpcRequest::Ipc {
command: IpcCommand::Disconnect,
callback: None,
});
}
fn reconnect(&self) -> Result<(), IpcHandleError> {
let (tx, rx) = channel();
self.send(IpcRequest::Reconnect { callback: tx })
.map_err(|_| IpcHandleError::HandleDied)?;
rx.recv().map_err(|_| IpcHandleError::HandleDied)?
}
}
pub(crate) trait IpcRx {
fn wait_for_reconnect_request(&self) -> Option<Sender<Result<(), IpcHandleError>>>;
}
impl IpcRx for Receiver<IpcRequest> {
fn wait_for_reconnect_request(&self) -> Option<Sender<Result<(), IpcHandleError>>> {
loop {
match self.recv() {
Ok(IpcRequest::Ipc { callback, .. }) => {
if let Some(callback) = callback {
callback(Err(PacketError::Disconnect));
}
continue;
}
Ok(IpcRequest::Reconnect { callback }) => return Some(callback),
Err(_) => return None,
}
}
}
}