#![cfg(feature = "async")]
#![allow(clippy::module_name_repetitions)]
use core::ffi::c_void;
use doom_fish_utils::stream::{AsyncStreamSender, BoundedAsyncStream};
use crate::ffi::{self, StreamEventCallback};
use crate::remote_commands::{
Command, CommandEvent, LanguageOptionSetting, RepeatType, SeekType, ShuffleType,
};
struct SubscriptionHandle {
ptr: *mut c_void,
sender: *mut c_void,
unsubscribe: unsafe fn(*mut c_void),
free_sender: unsafe fn(*mut c_void),
}
impl Drop for SubscriptionHandle {
fn drop(&mut self) {
if !self.ptr.is_null() {
unsafe { (self.unsubscribe)(self.ptr) }
}
if !self.sender.is_null() {
unsafe { (self.free_sender)(self.sender) }
}
}
}
unsafe impl Send for SubscriptionHandle {}
unsafe impl Sync for SubscriptionHandle {}
unsafe fn free_notification_sender(ptr: *mut c_void) {
drop(unsafe { Box::from_raw(ptr.cast::<AsyncStreamSender<NotificationEvent>>()) });
}
unsafe fn free_command_sender(ptr: *mut c_void) {
drop(unsafe { Box::from_raw(ptr.cast::<AsyncStreamSender<CommandEvent>>()) });
}
unsafe fn free_session_sender(ptr: *mut c_void) {
drop(unsafe { Box::from_raw(ptr.cast::<AsyncStreamSender<NowPlayingSessionEvent>>()) });
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i32)]
pub enum NotificationKind {
NowPlayingItemDidChange = 0,
PlaybackStateDidChange = 1,
VolumeDidChange = 2,
MediaLibraryDidChange = 3,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NotificationEvent {
pub kind: NotificationKind,
}
unsafe extern "C" fn notification_cb(kind: i32, _payload: *const c_void, ctx: *mut c_void) {
let sender = unsafe { &*ctx.cast::<AsyncStreamSender<NotificationEvent>>() };
let notification_kind = match kind {
0 => NotificationKind::NowPlayingItemDidChange,
1 => NotificationKind::PlaybackStateDidChange,
2 => NotificationKind::VolumeDidChange,
3 => NotificationKind::MediaLibraryDidChange,
_ => return,
};
sender.push(NotificationEvent { kind: notification_kind });
}
fn subscribe_notification(
kind: NotificationKind,
capacity: usize,
) -> (BoundedAsyncStream<NotificationEvent>, SubscriptionHandle) {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender));
let handle_ptr = unsafe {
ffi::mp_notification_subscribe(
kind as i32,
Some(notification_cb as StreamEventCallback),
sender_ptr.cast(),
)
};
if handle_ptr.is_null() {
unsafe { drop(Box::from_raw(sender_ptr)) };
}
let handle = SubscriptionHandle {
ptr: handle_ptr,
sender: if handle_ptr.is_null() { core::ptr::null_mut() } else { sender_ptr.cast() },
unsubscribe: |ptr| unsafe { ffi::mp_notification_unsubscribe(ptr) },
free_sender: free_notification_sender,
};
(stream, handle)
}
pub struct NowPlayingItemChangeStream {
inner: BoundedAsyncStream<NotificationEvent>,
_handle: SubscriptionHandle,
}
impl NowPlayingItemChangeStream {
#[must_use]
pub fn subscribe(capacity: usize) -> Self {
let (inner, handle) =
subscribe_notification(NotificationKind::NowPlayingItemDidChange, capacity);
Self { inner, _handle: handle }
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, NotificationEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<NotificationEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
pub struct PlaybackStateChangeStream {
inner: BoundedAsyncStream<NotificationEvent>,
_handle: SubscriptionHandle,
}
impl PlaybackStateChangeStream {
#[must_use]
pub fn subscribe(capacity: usize) -> Self {
let (inner, handle) =
subscribe_notification(NotificationKind::PlaybackStateDidChange, capacity);
Self { inner, _handle: handle }
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, NotificationEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<NotificationEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
pub struct VolumeChangeStream {
inner: BoundedAsyncStream<NotificationEvent>,
_handle: SubscriptionHandle,
}
impl VolumeChangeStream {
#[must_use]
pub fn subscribe(capacity: usize) -> Self {
let (inner, handle) =
subscribe_notification(NotificationKind::VolumeDidChange, capacity);
Self { inner, _handle: handle }
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, NotificationEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<NotificationEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
pub struct MediaLibraryChangeStream {
inner: BoundedAsyncStream<NotificationEvent>,
_handle: SubscriptionHandle,
}
impl MediaLibraryChangeStream {
#[must_use]
pub fn subscribe(capacity: usize) -> Self {
let (inner, handle) =
subscribe_notification(NotificationKind::MediaLibraryDidChange, capacity);
Self { inner, _handle: handle }
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, NotificationEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<NotificationEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
#[repr(C)]
#[derive(Clone, Copy)]
struct RawCommandPayload {
command_id: i32,
timestamp: f64,
extra: f64,
seek_type: i32,
rating: f64,
playback_rate: f64,
negative: i32,
shuffle_type: i32,
repeat_type: i32,
preserves_shuffle_mode: i32,
preserves_repeat_mode: i32,
language_option_setting: i32,
}
unsafe extern "C" fn remote_command_cb(
_kind: i32,
payload: *const c_void,
ctx: *mut c_void,
) {
if payload.is_null() {
return;
}
let raw = unsafe { &*payload.cast::<RawCommandPayload>() };
let sender = unsafe { &*ctx.cast::<AsyncStreamSender<CommandEvent>>() };
let command = Command::from_id(raw.command_id).unwrap_or(Command::Play);
let event = CommandEvent {
command,
timestamp: raw.timestamp,
skip_interval: matches!(command, Command::SkipForward | Command::SkipBackward)
.then_some(raw.extra)
.filter(|v| !v.is_nan()),
seek_type: matches!(command, Command::SeekForward | Command::SeekBackward)
.then(|| SeekType::from_raw(raw.seek_type))
.filter(|_| raw.seek_type >= 0),
position: matches!(command, Command::ChangePlaybackPosition)
.then_some(raw.extra)
.filter(|v| !v.is_nan()),
rating: (!raw.rating.is_nan()).then_some(raw.rating),
playback_rate: (!raw.playback_rate.is_nan()).then_some(raw.playback_rate),
feedback_negative: (raw.negative >= 0).then_some(raw.negative != 0),
shuffle_type: (raw.shuffle_type >= 0).then(|| ShuffleType::from_raw(raw.shuffle_type)),
repeat_type: (raw.repeat_type >= 0).then(|| RepeatType::from_raw(raw.repeat_type)),
preserves_shuffle_mode: (raw.preserves_shuffle_mode >= 0)
.then_some(raw.preserves_shuffle_mode != 0),
preserves_repeat_mode: (raw.preserves_repeat_mode >= 0)
.then_some(raw.preserves_repeat_mode != 0),
language_option: None,
language_option_setting: (raw.language_option_setting >= 0)
.then(|| LanguageOptionSetting::from_raw(raw.language_option_setting)),
};
sender.push(event);
}
pub struct RemoteCommandStream {
inner: BoundedAsyncStream<CommandEvent>,
_handle: SubscriptionHandle,
}
impl RemoteCommandStream {
#[must_use]
pub fn subscribe(command: Command, capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender));
let handle_ptr = unsafe {
ffi::mp_stream_remote_command_subscribe(
command as i32,
Some(remote_command_cb as StreamEventCallback),
sender_ptr.cast(),
)
};
if handle_ptr.is_null() {
unsafe { drop(Box::from_raw(sender_ptr)) };
}
let handle = SubscriptionHandle {
ptr: handle_ptr,
sender: if handle_ptr.is_null() { core::ptr::null_mut() } else { sender_ptr.cast() },
unsubscribe: |ptr| unsafe { ffi::mp_stream_remote_command_unsubscribe(ptr) },
free_sender: free_command_sender,
};
Self { inner: stream, _handle: handle }
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, CommandEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<CommandEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum NowPlayingSessionEventKind {
DidChangeActive,
DidChangeCanBecomeActive,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NowPlayingSessionEvent {
pub kind: NowPlayingSessionEventKind,
}
unsafe extern "C" fn now_playing_session_cb(
kind: i32,
_payload: *const c_void,
ctx: *mut c_void,
) {
let sender = unsafe { &*ctx.cast::<AsyncStreamSender<NowPlayingSessionEvent>>() };
let event_kind = match kind {
0 => NowPlayingSessionEventKind::DidChangeActive,
1 => NowPlayingSessionEventKind::DidChangeCanBecomeActive,
_ => return,
};
sender.push(NowPlayingSessionEvent { kind: event_kind });
}
pub struct NowPlayingSessionStream {
inner: BoundedAsyncStream<NowPlayingSessionEvent>,
_handle: SubscriptionHandle,
}
impl NowPlayingSessionStream {
#[must_use]
pub fn subscribe(capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender));
let handle_ptr = unsafe {
ffi::mp_now_playing_session_stream_subscribe(
Some(now_playing_session_cb as StreamEventCallback),
sender_ptr.cast(),
)
};
if handle_ptr.is_null() {
unsafe { drop(Box::from_raw(sender_ptr)) };
}
let handle = SubscriptionHandle {
ptr: handle_ptr,
sender: if handle_ptr.is_null() { core::ptr::null_mut() } else { sender_ptr.cast() },
unsubscribe: |ptr| unsafe { ffi::mp_now_playing_session_stream_unsubscribe(ptr) },
free_sender: free_session_sender,
};
Self { inner: stream, _handle: handle }
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, NowPlayingSessionEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<NowPlayingSessionEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}