#![cfg(feature = "async")]
#![allow(clippy::module_name_repetitions)]
use core::ffi::c_void;
use core::sync::atomic::{AtomicUsize, Ordering};
use doom_fish_utils::stream::{AsyncStreamSender, BoundedAsyncStream};
use crate::ffi::{self, StreamEventCallback};
use crate::remote_commands::{
Command, CommandEvent, LanguageOptionSetting, RepeatType, SeekType, ShuffleType,
};
struct StreamContext {
ref_count: AtomicUsize,
sender: *mut c_void,
drop_sender: unsafe fn(*mut c_void),
}
impl StreamContext {
fn new(sender: *mut c_void, drop_sender: unsafe fn(*mut c_void)) -> *mut Self {
Box::into_raw(Box::new(Self {
ref_count: AtomicUsize::new(1),
sender,
drop_sender,
}))
}
unsafe fn retain(ptr: *mut Self) {
unsafe { &*ptr }.ref_count.fetch_add(1, Ordering::Relaxed);
}
unsafe fn release(ptr: *mut Self) {
if ptr.is_null() {
return;
}
let prev = unsafe { &*ptr }.ref_count.fetch_sub(1, Ordering::Release);
if prev == 1 {
core::sync::atomic::fence(Ordering::Acquire);
let ctx = unsafe { Box::from_raw(ptr) };
unsafe { (ctx.drop_sender)(ctx.sender) };
}
}
}
extern "C" fn context_retain_cb(context: *mut c_void) {
if !context.is_null() {
unsafe { StreamContext::retain(context.cast::<StreamContext>()) };
}
}
extern "C" fn context_release_cb(context: *mut c_void) {
unsafe { StreamContext::release(context.cast::<StreamContext>()) };
}
struct SubscriptionHandle {
ptr: *mut c_void,
context: *mut StreamContext,
unsubscribe: unsafe fn(*mut c_void),
}
impl Drop for SubscriptionHandle {
fn drop(&mut self) {
if !self.ptr.is_null() {
unsafe { (self.unsubscribe)(self.ptr) }
}
unsafe { StreamContext::release(self.context) };
}
}
unsafe impl Send for SubscriptionHandle {}
unsafe impl Sync for SubscriptionHandle {}
unsafe fn free_notification_sender(ptr: *mut c_void) {
drop(unsafe { Box::from_raw(ptr.cast::<AsyncStreamSender<NotificationEvent>>()) });
}
unsafe fn free_command_sender(ptr: *mut c_void) {
drop(unsafe { Box::from_raw(ptr.cast::<AsyncStreamSender<CommandEvent>>()) });
}
unsafe fn free_session_sender(ptr: *mut c_void) {
drop(unsafe { Box::from_raw(ptr.cast::<AsyncStreamSender<NowPlayingSessionEvent>>()) });
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i32)]
pub enum NotificationKind {
NowPlayingItemDidChange = 0,
PlaybackStateDidChange = 1,
VolumeDidChange = 2,
MediaLibraryDidChange = 3,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NotificationEvent {
pub kind: NotificationKind,
}
unsafe extern "C" fn notification_cb(kind: i32, _payload: *const c_void, ctx: *mut c_void) {
let context = unsafe { &*ctx.cast::<StreamContext>() };
let sender = unsafe {
&*context
.sender
.cast::<AsyncStreamSender<NotificationEvent>>()
};
let notification_kind = match kind {
0 => NotificationKind::NowPlayingItemDidChange,
1 => NotificationKind::PlaybackStateDidChange,
2 => NotificationKind::VolumeDidChange,
3 => NotificationKind::MediaLibraryDidChange,
_ => return,
};
sender.push(NotificationEvent {
kind: notification_kind,
});
}
fn subscribe_notification(
kind: NotificationKind,
capacity: usize,
) -> (BoundedAsyncStream<NotificationEvent>, SubscriptionHandle) {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender)).cast::<c_void>();
let context = StreamContext::new(sender_ptr, free_notification_sender);
let handle_ptr = unsafe {
ffi::mp_notification_subscribe(
kind as i32,
Some(notification_cb as StreamEventCallback),
context.cast(),
context_retain_cb,
context_release_cb,
)
};
let handle = SubscriptionHandle {
ptr: handle_ptr,
context,
unsubscribe: |ptr| unsafe { ffi::mp_notification_unsubscribe(ptr) },
};
(stream, handle)
}
pub struct NowPlayingItemChangeStream {
inner: BoundedAsyncStream<NotificationEvent>,
_handle: SubscriptionHandle,
}
impl NowPlayingItemChangeStream {
#[must_use]
pub fn subscribe(capacity: usize) -> Self {
let (inner, handle) =
subscribe_notification(NotificationKind::NowPlayingItemDidChange, capacity);
Self {
inner,
_handle: handle,
}
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, NotificationEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<NotificationEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
pub struct PlaybackStateChangeStream {
inner: BoundedAsyncStream<NotificationEvent>,
_handle: SubscriptionHandle,
}
impl PlaybackStateChangeStream {
#[must_use]
pub fn subscribe(capacity: usize) -> Self {
let (inner, handle) =
subscribe_notification(NotificationKind::PlaybackStateDidChange, capacity);
Self {
inner,
_handle: handle,
}
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, NotificationEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<NotificationEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
pub struct VolumeChangeStream {
inner: BoundedAsyncStream<NotificationEvent>,
_handle: SubscriptionHandle,
}
impl VolumeChangeStream {
#[must_use]
pub fn subscribe(capacity: usize) -> Self {
let (inner, handle) = subscribe_notification(NotificationKind::VolumeDidChange, capacity);
Self {
inner,
_handle: handle,
}
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, NotificationEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<NotificationEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
pub struct MediaLibraryChangeStream {
inner: BoundedAsyncStream<NotificationEvent>,
_handle: SubscriptionHandle,
}
impl MediaLibraryChangeStream {
#[must_use]
pub fn subscribe(capacity: usize) -> Self {
let (inner, handle) =
subscribe_notification(NotificationKind::MediaLibraryDidChange, capacity);
Self {
inner,
_handle: handle,
}
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, NotificationEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<NotificationEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
#[repr(C)]
#[derive(Clone, Copy)]
struct RawCommandPayload {
command_id: i32,
timestamp: f64,
extra: f64,
seek_type: i32,
rating: f64,
playback_rate: f64,
negative: i32,
shuffle_type: i32,
repeat_type: i32,
preserves_shuffle_mode: i32,
preserves_repeat_mode: i32,
language_option_setting: i32,
}
const _: () = {
use core::mem::{align_of, offset_of, size_of};
assert!(size_of::<RawCommandPayload>() == 72);
assert!(align_of::<RawCommandPayload>() == 8);
assert!(offset_of!(RawCommandPayload, command_id) == 0);
assert!(offset_of!(RawCommandPayload, timestamp) == 8);
assert!(offset_of!(RawCommandPayload, extra) == 16);
assert!(offset_of!(RawCommandPayload, seek_type) == 24);
assert!(offset_of!(RawCommandPayload, rating) == 32);
assert!(offset_of!(RawCommandPayload, playback_rate) == 40);
assert!(offset_of!(RawCommandPayload, negative) == 48);
assert!(offset_of!(RawCommandPayload, shuffle_type) == 52);
assert!(offset_of!(RawCommandPayload, repeat_type) == 56);
assert!(offset_of!(RawCommandPayload, preserves_shuffle_mode) == 60);
assert!(offset_of!(RawCommandPayload, preserves_repeat_mode) == 64);
assert!(offset_of!(RawCommandPayload, language_option_setting) == 68);
};
unsafe extern "C" fn remote_command_cb(_kind: i32, payload: *const c_void, ctx: *mut c_void) {
if payload.is_null() {
return;
}
let raw = unsafe { &*payload.cast::<RawCommandPayload>() };
let context = unsafe { &*ctx.cast::<StreamContext>() };
let sender = unsafe { &*context.sender.cast::<AsyncStreamSender<CommandEvent>>() };
let command = Command::from_id(raw.command_id).unwrap_or(Command::Play);
let event = CommandEvent {
command,
timestamp: raw.timestamp,
skip_interval: matches!(command, Command::SkipForward | Command::SkipBackward)
.then_some(raw.extra)
.filter(|v| !v.is_nan()),
seek_type: matches!(command, Command::SeekForward | Command::SeekBackward)
.then(|| SeekType::from_raw(raw.seek_type))
.filter(|_| raw.seek_type >= 0),
position: matches!(command, Command::ChangePlaybackPosition)
.then_some(raw.extra)
.filter(|v| !v.is_nan()),
rating: (!raw.rating.is_nan()).then_some(raw.rating),
playback_rate: (!raw.playback_rate.is_nan()).then_some(raw.playback_rate),
feedback_negative: (raw.negative >= 0).then_some(raw.negative != 0),
shuffle_type: (raw.shuffle_type >= 0).then(|| ShuffleType::from_raw(raw.shuffle_type)),
repeat_type: (raw.repeat_type >= 0).then(|| RepeatType::from_raw(raw.repeat_type)),
preserves_shuffle_mode: (raw.preserves_shuffle_mode >= 0)
.then_some(raw.preserves_shuffle_mode != 0),
preserves_repeat_mode: (raw.preserves_repeat_mode >= 0)
.then_some(raw.preserves_repeat_mode != 0),
language_option: None,
language_option_setting: (raw.language_option_setting >= 0)
.then(|| LanguageOptionSetting::from_raw(raw.language_option_setting)),
};
sender.push(event);
}
pub struct RemoteCommandStream {
inner: BoundedAsyncStream<CommandEvent>,
_handle: SubscriptionHandle,
}
impl RemoteCommandStream {
#[must_use]
pub fn subscribe(command: Command, capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender)).cast::<c_void>();
let context = StreamContext::new(sender_ptr, free_command_sender);
let handle_ptr = unsafe {
ffi::mp_stream_remote_command_subscribe(
command as i32,
Some(remote_command_cb as StreamEventCallback),
context.cast(),
context_retain_cb,
context_release_cb,
)
};
let handle = SubscriptionHandle {
ptr: handle_ptr,
context,
unsubscribe: |ptr| unsafe { ffi::mp_stream_remote_command_unsubscribe(ptr) },
};
Self {
inner: stream,
_handle: handle,
}
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, CommandEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<CommandEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum NowPlayingSessionEventKind {
DidChangeActive,
DidChangeCanBecomeActive,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NowPlayingSessionEvent {
pub kind: NowPlayingSessionEventKind,
}
unsafe extern "C" fn now_playing_session_cb(kind: i32, _payload: *const c_void, ctx: *mut c_void) {
let context = unsafe { &*ctx.cast::<StreamContext>() };
let sender = unsafe {
&*context
.sender
.cast::<AsyncStreamSender<NowPlayingSessionEvent>>()
};
let event_kind = match kind {
0 => NowPlayingSessionEventKind::DidChangeActive,
1 => NowPlayingSessionEventKind::DidChangeCanBecomeActive,
_ => return,
};
sender.push(NowPlayingSessionEvent { kind: event_kind });
}
pub struct NowPlayingSessionStream {
inner: BoundedAsyncStream<NowPlayingSessionEvent>,
_handle: SubscriptionHandle,
}
impl NowPlayingSessionStream {
#[must_use]
pub fn subscribe(capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender)).cast::<c_void>();
let context = StreamContext::new(sender_ptr, free_session_sender);
let handle_ptr = unsafe {
ffi::mp_now_playing_session_stream_subscribe(
Some(now_playing_session_cb as StreamEventCallback),
context.cast(),
context_retain_cb,
context_release_cb,
)
};
let handle = SubscriptionHandle {
ptr: handle_ptr,
context,
unsubscribe: |ptr| unsafe { ffi::mp_now_playing_session_stream_unsubscribe(ptr) },
};
Self {
inner: stream,
_handle: handle,
}
}
#[must_use]
pub fn next(&self) -> doom_fish_utils::stream::NextItem<'_, NowPlayingSessionEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<NowPlayingSessionEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}