use std::{
collections::HashMap,
io::Write,
sync::{
Arc, LazyLock, Mutex, MutexGuard,
atomic::{AtomicBool, Ordering},
},
};
use crossbeam::channel::{self, Receiver, Sender};
use lunar_lib::{
database::Entry,
iterator_ext::IteratorExtensions,
log::{error, warn},
runners::{Executor, Runner, spawn_background_thread},
};
use selene_core::{
library::{collectable::Collectable, track::Track},
symphonia_helpers::raw_decoder::{Decoder, DecodingError},
};
#[cfg(feature = "plugin-support")]
use selene_plugin_sdk::local_session::{EventListener, StableEventListenerDynMut};
#[cfg(feature = "local-session")]
use crate::cpal_thread::CpalHandle;
#[cfg(feature = "plugin-support")]
use crate::dto::sdk_impls::Stable;
use crate::{
LoopMode, SessionToken, ShuffleMode,
common::{AudioHandshakeResponse, HandshakeError, Stream},
decoder::{DecoderCommandExt, DecoderHandle, DecoderInner},
dto::IntoDto,
ipc_common::{AudioPacket, Packetable, PlayerQueryFlags, QueryResult, RemoteEvent, TrackInfo},
playlist::{Playable, PlayingTrack, Playlist},
should_shutdown,
};
static SESSION_REGISTRY: LazyLock<Mutex<HashMap<SessionToken, SessionHandle>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
pub(crate) fn session_registry() -> MutexGuard<'static, HashMap<SessionToken, SessionHandle>> {
SESSION_REGISTRY.lock().unwrap()
}
#[derive(Clone)]
pub(crate) struct SessionHandle {
pub session_runner: Runner<SessionMainThread>,
pub event_runner: Runner<SessionEventThread>,
pub decoder_runner: Runner<DecoderInner>,
pub sink_tx: Sender<Box<dyn Stream>>,
pub controller_tx: Sender<Sender<Arc<[u8]>>>,
}
impl SessionHandle {
pub(crate) fn create_with_event_client(event_listener: Sender<Arc<[u8]>>) -> Self {
Self::open(
Some(event_listener),
None,
#[cfg(feature = "local-session")]
None,
#[cfg(feature = "plugin-support")]
None,
)
}
pub(crate) fn create_with_audio_client(audio_listener: Box<dyn Stream>) -> SessionHandle {
Self::open(
None,
Some(audio_listener),
#[cfg(feature = "local-session")]
None,
#[cfg(feature = "plugin-support")]
None,
)
}
#[cfg(feature = "local-session")]
pub(crate) fn open_local(cpal_handle: CpalHandle) -> Self {
Self::open(
None,
None,
Some(cpal_handle),
#[cfg(feature = "plugin-support")]
None,
)
}
#[cfg(feature = "plugin-support")]
pub(crate) fn open_local_with_plugins(
cpal_handle: CpalHandle,
event_callbacks: Vec<EventListener>,
) -> Self {
Self::open(None, None, Some(cpal_handle), Some(event_callbacks))
}
fn open(
event_listener: Option<Sender<Arc<[u8]>>>,
mut audio_listener: Option<Box<dyn Stream>>,
#[cfg(feature = "local-session")] cpal_handle: Option<CpalHandle>,
#[cfg(feature = "plugin-support")] plugin_event_callbacks: Option<Vec<EventListener>>,
) -> Self {
#[cfg(feature = "local-session")]
debug_assert!(
event_listener.is_some() || cpal_handle.is_some() || audio_listener.is_some()
);
#[cfg(not(feature = "local-session"))]
debug_assert!(event_listener.is_some() || audio_listener.is_some());
let (session_runner, session_exec) = lunar_lib::runners::channel();
let (sink_tx, sink_rx) = channel::unbounded();
let (controller_tx, controller_rx) = channel::unbounded();
let (event_runner, event_exec) = lunar_lib::runners::channel();
if let Some(audio_listener) = audio_listener.as_mut() {
let _ = Ok::<_, HandshakeError>(AudioHandshakeResponse { track_info: None })
.serialize_into_writer(audio_listener);
}
let looping = Arc::new(AtomicBool::new(false));
let playing = Arc::new(AtomicBool::new(false));
let DecoderHandle {
runner: decoder_runner,
audio_buffer,
} = DecoderHandle::open(
playing.clone(),
looping.clone(),
session_runner.clone(),
event_runner.clone(),
);
let event_thread = SessionEventThread {
exec: event_exec,
event_client_rx: controller_rx,
event_clients: event_listener.into_iter().collect(),
#[cfg(feature = "plugin-support")]
plugin_event_callbacks,
};
spawn_background_thread(
move || event_thread.run(),
Some(|result| {
if let Err(err) = result {
error!("Session closed with error: {err}");
}
}),
);
let main_thread = SessionMainThread::new(
session_exec,
event_runner.clone(),
decoder_runner.clone(),
looping,
playing,
);
spawn_background_thread(
move || main_thread.run(),
Some(|result| {
if let Err(err) = result {
error!("Session closed with error: {err}");
}
}),
);
let audio_thread = SessionAudioSenderThread {
audio_client_rx: sink_rx,
audio_clients: audio_listener.into_iter().collect(),
audio_buffer,
#[cfg(feature = "local-session")]
cpal_handle,
latest_track_info: None,
};
spawn_background_thread(
move || audio_thread.run(),
Some(|result| {
if let Err(err) = result {
error!("Session closed with error: {err}");
}
}),
);
Self {
session_runner,
event_runner,
decoder_runner,
sink_tx,
controller_tx,
}
}
}
pub(crate) struct SessionMainThread {
exec: Executor<Self>,
event_runner: Runner<SessionEventThread>,
decoder_runner: Runner<DecoderInner>,
preload: Option<(Decoder, PlayingTrack)>,
playlist: Playlist,
playing: Option<PlayingTrack>,
is_playing: Arc<AtomicBool>,
}
impl SessionMainThread {
pub(crate) fn new(
exec: Executor<Self>,
event_runner: Runner<SessionEventThread>,
decoder_runner: Runner<DecoderInner>,
looping: Arc<AtomicBool>,
is_playing: Arc<AtomicBool>,
) -> Self {
Self {
exec,
event_runner,
decoder_runner,
preload: None,
playlist: Playlist::new(looping),
playing: None,
is_playing,
}
}
fn run(mut self) -> anyhow::Result<()> {
while !should_shutdown() {
crossbeam::select! {
recv(self.exec) -> runner_fn => {
(runner_fn?)(&mut self);
}
}
}
Ok(())
}
fn replace_decoders(&mut self, pop: bool) -> anyhow::Result<bool> {
let load = if pop {
self.playlist.pop_next()
} else {
self.playlist.current()
};
let preload_track = self.playlist.peek_next();
if let Some(load_track) = load {
let loaded_decoder = Decoder::from_container(load_track.container(), 512 * 1024)?;
if let Some(preload_track) = preload_track {
let preloaded_decoder =
Decoder::from_container(preload_track.container(), 512 * 1024)?;
self.preload = Some((preloaded_decoder, preload_track));
}
self.decoder_runner.load(loaded_decoder)?;
self.event_runner.currently_playing_changed(load_track)?;
Ok(true)
} else {
self.decoder_runner.stop()??;
Ok(false)
}
}
fn try_preload(&mut self) -> Result<(), DecodingError> {
if let Some(playable) = self.playlist.peek_next() {
let decoder = Decoder::from_container(playable.container(), 512 * 1024)?;
self.preload = Some((decoder, playable));
} else {
self.preload = None;
}
Ok(())
}
fn skip(&mut self) -> anyhow::Result<()> {
if let Some((decoder, source)) = self.preload.take() {
self.decoder_runner.load(decoder)?;
self.playlist.pop_next();
self.try_preload()?;
self.event_runner.currently_playing_changed(source)?;
} else {
self.decoder_runner.stop()??;
}
Ok(())
}
}
lunar_lib::make_runner_command_ext!(SessionMainThread => pub(crate) trait SessionRunnerExt {
fn play(&mut self, playables: Vec<Playable>) -> anyhow::Result<()> {
self.playlist.set_playlist(playables);
let loaded = self.replace_decoders(true)?;
if loaded {
self.decoder_runner.set_playing(true)??;
}
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
self.decoder_runner.stop()??;
Ok(())
}
fn set_is_playing(&mut self, is_playing: bool) -> anyhow::Result<bool> {
let state = self.decoder_runner.set_playing(is_playing)??;
Ok(state)
}
fn toggle_playing(&mut self) -> anyhow::Result<bool> {
let state = self.decoder_runner.toggle_playing()??;
Ok(state)
}
fn seek(&mut self, seconds: f64, increment: bool) -> anyhow::Result<Option<f64>> {
let time = self.decoder_runner.seek(seconds, increment)??;
self.event_runner.seek_occured(time)?;
Ok(time)
}
fn next(&mut self) -> anyhow::Result<Option<u32>> {
if matches!(self.playlist.loop_mode(), LoopMode::RepeatTrack) {
self.decoder_runner.seek(0.0, false)??;
} else {
self.skip()?;
}
Ok(self.playlist.position())
}
fn previous(&mut self) -> anyhow::Result<Option<u32>> {
if matches!(self.playlist.loop_mode(), LoopMode::RepeatTrack) {
self.decoder_runner.seek(0.0, false)??;
} else {
self.playlist.tracklist_seek(-1, true);
self.replace_decoders(false)?;
}
Ok(self.playlist.position())
}
fn queue_set(&mut self, tracks: Vec<Playable>) -> anyhow::Result<()> {
self.playlist.set_queue(&tracks);
self.try_preload()?;
Ok(())
}
fn queue_extend(&mut self, tracks: Vec<Playable>) -> anyhow::Result<()> {
self.playlist.extend_queue(&tracks);
self.try_preload()?;
Ok(())
}
fn queue_remove(&mut self, targets: Vec<(u32, Collectable)>) -> bool {
todo!()
}
fn queue_list(&mut self, limit: u32, offset: u32) -> Vec<Arc<Entry<Track>>> {
self.playlist
.queue()
.range(offset as usize..(offset + limit) as usize)
.map(Arc::clone)
.to_vec()
}
fn queue_count(&mut self) -> u32 {
self.playlist.queue().len() as u32
}
fn queue_shuffle(&mut self) -> anyhow::Result<()> {
self.playlist.shuffle_queue();
self.try_preload()?;
Ok(())
}
fn queue_clear(&mut self) -> anyhow::Result<()> {
self.playlist.clear_queue();
self.try_preload()?;
Ok(())
}
fn playlist_set(&mut self, playables: Vec<Playable>) -> anyhow::Result<()> {
self.playlist.set_playlist(playables);
self.try_preload()?;
Ok(())
}
fn playlist_extend(&mut self, playables: Vec<Playable>) -> anyhow::Result<()> {
self.playlist.extend_playlist(playables);
self.try_preload()?;
Ok(())
}
fn playlist_remove(&mut self, targets: Vec<(u32, Collectable)>) -> bool {
todo!()
}
fn playlist_list(&mut self, limit: u32, offset: u32) -> Vec<Playable> {
self.playlist.playlist()[offset as usize..(limit + offset) as usize]
.iter()
.map(Playable::clone)
.to_vec()
}
fn playlist_count(&mut self) -> u32 {
self.playlist.playlist().len() as u32
}
fn playlist_clear(&mut self) -> anyhow::Result<()> {
self.playlist.clear();
self.try_preload()?;
Ok(())
}
fn set_shuffle_mode(&mut self, shuffle_mode: ShuffleMode) -> anyhow::Result<()> {
self.playlist.set_shuffle_mode(shuffle_mode);
self.try_preload()?;
Ok(())
}
fn get_shuffle_mode(&mut self) -> ShuffleMode {
self.playlist.shuffle_mode()
}
fn set_loop_mode(&mut self, loop_mode: LoopMode) -> () {
self.playlist.set_loop_mode(loop_mode);
}
fn get_loop_mode(&mut self) -> LoopMode {
self.playlist.loop_mode()
}
fn tracklist_seek(&mut self, index: i32, increment: bool) -> anyhow::Result<Option<u32>> {
let position = self.playlist.tracklist_seek(index, increment);
self.replace_decoders(false)?;
Ok(position)
}
fn tracklist_list(&mut self, limit: u32, offset: u32) -> Vec<Arc<Entry<Track>>> {
self.playlist.tracklist()[offset as usize..(limit + offset) as usize]
.iter()
.map(Arc::clone)
.to_vec()
}
fn tracklist_count(&mut self) -> u32 {
self.playlist.tracklist().len() as u32
}
fn tracklist_reshuffle(&mut self) -> () {
self.playlist.reshuffle_tracklist();
}
fn get_state(&mut self, flags: PlayerQueryFlags) -> anyhow::Result<QueryResult> {
let mut query_result = QueryResult::default();
if flags.contains(PlayerQueryFlags::PLAYBACK_STATE) {
query_result.playback_state = Some(self.is_playing.load(Ordering::Relaxed));
}
if flags.contains(PlayerQueryFlags::CURRENTLY_PLAYING) {
query_result.currently_playing =
Some(self.playing.as_ref().map(|p| (p.source.id(), p.position)));
}
if flags.contains(PlayerQueryFlags::TIME) {
query_result.time = Some(self.decoder_runner.get_time()?);
}
if flags.contains(PlayerQueryFlags::QUEUE) {
query_result.queue =
Some(self.playlist.queue().iter().map(|t| t.id()).collect());
}
if flags.contains(PlayerQueryFlags::PLAYLIST) {
query_result.playlist = Some(
self.playlist
.playlist()
.iter()
.map(Playable::to_collectable)
.to_vec(),
);
}
if flags.contains(PlayerQueryFlags::TRACKLIST) {
query_result.tracklist =
Some(self.playlist.tracklist().iter().map(|t| t.id()).collect());
}
if flags.contains(PlayerQueryFlags::TRACKLIST_POSITION) {
query_result.tracklist_position = Some(self.playlist.position());
}
if flags.contains(PlayerQueryFlags::SHUFFLE_MODE) {
query_result.shuffle_mode = Some(self.playlist.shuffle_mode());
}
if flags.contains(PlayerQueryFlags::TRACKLIST_POSITION) {
query_result.loop_mode = Some(self.playlist.loop_mode());
}
Ok(query_result)
}
});
lunar_lib::make_runner_command_ext!(SessionMainThread => pub(crate) trait DecoderMessageExt {
fn request_preload(&mut self) -> Option<(Decoder, PlayingTrack)> {
let preload = self.preload.take();
self.playlist.pop_next();
self.try_preload().expect("Error handling not yet implemented");
preload
}
});
pub(crate) struct SessionEventThread {
exec: Executor<Self>,
event_client_rx: Receiver<Sender<Arc<[u8]>>>,
event_clients: Vec<Sender<Arc<[u8]>>>,
#[cfg(feature = "plugin-support")]
pub(crate) plugin_event_callbacks: Option<Vec<EventListener>>,
}
impl SessionEventThread {
fn send_remote_event(&mut self, event: RemoteEvent) {
let data = event.serialize_into_arc();
self.event_clients.retain(|c| c.send(data.clone()).is_ok());
}
fn run(mut self) -> anyhow::Result<()> {
while !should_shutdown() {
crossbeam::select! {
recv(self.exec) -> event_fn => {
(event_fn?)(&mut self);
}
recv(self.event_client_rx) -> client => {
self.event_clients.push(client?);
}
}
}
Ok(())
}
}
lunar_lib::make_runner_event_ext!(SessionEventThread => pub(crate) trait SessionEventExt {
fn currently_playing_changed(&mut self, current_playing: PlayingTrack) {
match (*current_playing.source).clone().into_dto() {
Ok(dto) => {
self.send_remote_event(RemoteEvent::CurrentlyPlayingChanged { currently_playing: Box::new(dto.clone()), position: current_playing.position });
#[cfg(feature = "plugin-support")]
{
let arc_dto = Arc::new(dto);
if let Some(items) = &mut self.plugin_event_callbacks {
for event_callback in items {
event_callback.currently_playing_changed(selene_plugin_sdk::Box::new(Stable::<Track>::from_dto(arc_dto.clone())).into(), current_playing.position.into());
}
}
}
}
Err(_) => warn!("Failed to send event to clients: Failed to get dto")
}
}
fn playback_is_playing_changed(&mut self, is_playing: bool, changed_at: f64) {
self.send_remote_event(RemoteEvent::PlaybackIsPlayingChanged { is_playing, changed_at });
#[cfg(feature = "plugin-support")]
{
if let Some(items) = &mut self.plugin_event_callbacks {
for event_callback in items {
event_callback.playback_is_playing_changed(is_playing, changed_at);
}
}
}
}
fn playback_stopped(&mut self) {
self.send_remote_event(RemoteEvent::PlaybackStopped);
#[cfg(feature = "plugin-support")]
{
if let Some(items) = &mut self.plugin_event_callbacks {
for event_callback in items {
event_callback.playback_stopped();
}
}
}
}
fn shuffle_mode_changed(&mut self, shuffle_mode: ShuffleMode) {
self.send_remote_event(RemoteEvent::ShuffleModeChanged { shuffle_mode });
#[cfg(feature = "plugin-support")]
{
if let Some(items) = &mut self.plugin_event_callbacks {
for event_callback in items {
event_callback.shuffle_mode_changed(shuffle_mode.into());
}
}
}
}
fn loop_mode_changed(&mut self, loop_mode: LoopMode) {
self.send_remote_event(RemoteEvent::LoopModeChanged { loop_mode });
#[cfg(feature = "plugin-support")]
{
if let Some(items) = &mut self.plugin_event_callbacks {
for event_callback in items {
event_callback.loop_mode_changed(loop_mode.into());
}
}
}
}
fn seek_occured(&mut self, time: Option<f64>) {
self.send_remote_event(RemoteEvent::SeekOccured { time });
#[cfg(feature = "plugin-support")]
{
if let Some(items) = &mut self.plugin_event_callbacks {
for event_callback in items {
event_callback.seek_occured(time.into());
}
}
}
}
fn queue_changed(&mut self, queue: Vec<Arc<Entry<Track>>>) {
todo!()
}
fn playlist_changed(&mut self, playlist: Vec<Arc<Playable>>) {
todo!()
}
fn tracklist_changed(&mut self, tracklist: Vec<Arc<Entry<Track>>>, position: Option<usize>) {
todo!()
}
fn scrobble(&mut self, start_time: u64) {
self.send_remote_event(RemoteEvent::Scrobble { start_time });
#[cfg(feature = "plugin-support")]
{
if let Some(items) = &mut self.plugin_event_callbacks {
for event_callback in items {
event_callback.scrobble(start_time);
}
}
}
}
fn now_playing(&mut self) {
self.send_remote_event(RemoteEvent::NowPlaying);
#[cfg(feature = "plugin-support")]
{
if let Some(items) = &mut self.plugin_event_callbacks {
for event_callback in items {
event_callback.now_playing();
}
}
}
}
});
pub(crate) struct SessionAudioSenderThread {
pub audio_client_rx: Receiver<Box<dyn Stream>>,
pub audio_clients: Vec<Box<dyn Stream>>,
pub audio_buffer: Receiver<AudioPacket>,
#[cfg(feature = "local-session")]
pub cpal_handle: Option<CpalHandle>,
pub latest_track_info: Option<TrackInfo>,
}
impl SessionAudioSenderThread {
pub fn run(mut self) -> anyhow::Result<()> {
while !should_shutdown() {
crossbeam::select! {
recv(self.audio_buffer) -> audio_packet => {
let audio_packet = audio_packet?;
#[cfg(feature = "local-session")]
if let Some(cpal_handle) = &mut self.cpal_handle {
match audio_packet {
AudioPacket::TrackInfo(track_info) => {
cpal_handle.set_track_info(track_info).unwrap();
self.latest_track_info = Some(track_info);
}
AudioPacket::Audio(ref items) => cpal_handle.input_audio_packet(items).unwrap(),
AudioPacket::Disconnect => panic!("Local session should never close"),
}
}
if !self.audio_clients.is_empty() {
let mut serialized_audio =
postcard::to_stdvec(&audio_packet).expect("Serialization should never fail");
serialized_audio.extend((serialized_audio.len() as u32).to_be_bytes());
serialized_audio.rotate_right(4);
self.audio_clients
.retain_mut(|c| c.write_all(&serialized_audio).is_ok());
}
}
recv(self.audio_client_rx) -> audio_sink => {
let mut audio_sink = audio_sink?;
let _ = Ok::<_, HandshakeError>(AudioHandshakeResponse { track_info: self.latest_track_info })
.serialize_into_writer(&mut audio_sink);
self.audio_clients.push(audio_sink);
}
}
}
Ok(())
}
}