use std::sync::mpsc;
use std::thread;
use core::ffi::c_void;
use core::ptr;
use doom_fish_utils::stream::{AsyncStreamSender, BoundedAsyncStream, NextItem};
use crate::ffi_impl as ffi;
use crate::hid::{
clone_value_ref, HidDevice, HidInputReport, HidManager, HidQueue, HidReportType, HidValue,
};
#[derive(Clone)]
pub struct InputValueEvent {
pub device: HidDevice,
pub value: HidValue,
}
#[derive(Clone)]
pub struct DeviceMatchingEvent {
pub device: HidDevice,
}
#[derive(Clone)]
pub struct DeviceRemovalEvent {
pub device: HidDevice,
}
#[derive(Clone)]
pub struct InputReportEvent {
pub device: HidDevice,
pub report: HidInputReport,
}
struct SendableRunLoop(ffi::CFRunLoopRef);
unsafe impl Send for SendableRunLoop {}
struct RunLoopThreadGuard {
run_loop: ffi::CFRunLoopRef,
thread: Option<thread::JoinHandle<()>>,
}
unsafe impl Send for RunLoopThreadGuard {}
unsafe impl Sync for RunLoopThreadGuard {}
impl Drop for RunLoopThreadGuard {
fn drop(&mut self) {
if !self.run_loop.is_null() {
unsafe { ffi::CFRunLoopStop(self.run_loop) };
}
if let Some(t) = self.thread.take() {
t.join().ok();
}
}
}
unsafe extern "C" fn mgr_input_value_cb(
context: *mut c_void,
result: ffi::IOReturn,
sender: *mut c_void, value: ffi::IOHIDValueRef,
) {
if context.is_null() || sender.is_null() || value.is_null() || result != ffi::kIOReturnSuccess {
return;
}
let Some(value) = clone_value_ref(value) else {
return;
};
let device = HidDevice::from_raw_retained(sender.cast());
let tx = unsafe { &*context.cast::<AsyncStreamSender<InputValueEvent>>() };
tx.push(InputValueEvent { device, value });
}
pub struct ManagerInputValueStream {
inner: BoundedAsyncStream<InputValueEvent>,
_guard: RunLoopThreadGuard,
}
unsafe impl Send for ManagerInputValueStream {}
unsafe impl Sync for ManagerInputValueStream {}
impl ManagerInputValueStream {
#[must_use]
pub fn subscribe(manager: &HidManager, capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender)) as usize;
let manager_raw = manager.as_ptr() as usize;
unsafe { ffi::CFRetain((manager_raw as ffi::IOHIDManagerRef).cast_const()) };
let (tx, rx) = mpsc::channel::<SendableRunLoop>();
let thread = thread::spawn(move || {
let manager_raw = manager_raw as ffi::IOHIDManagerRef;
let sender_ptr = sender_ptr as *mut AsyncStreamSender<InputValueEvent>;
let run_loop = unsafe { ffi::CFRunLoopGetCurrent() };
unsafe {
ffi::IOHIDManagerScheduleWithRunLoop(
manager_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::IOHIDManagerRegisterInputValueCallback(
manager_raw,
Some(mgr_input_value_cb),
sender_ptr.cast(),
);
}
tx.send(SendableRunLoop(run_loop)).ok();
unsafe { ffi::CFRunLoopRun() };
unsafe {
ffi::IOHIDManagerRegisterInputValueCallback(
manager_raw,
None,
ptr::null_mut(),
);
ffi::IOHIDManagerUnscheduleFromRunLoop(
manager_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::CFRelease(manager_raw.cast_const());
drop(Box::from_raw(sender_ptr));
}
});
let run_loop = rx.recv().map_or(ptr::null_mut(), |s| s.0);
Self {
inner: stream,
_guard: RunLoopThreadGuard {
run_loop,
thread: Some(thread),
},
}
}
#[must_use]
pub const fn next(&self) -> NextItem<'_, InputValueEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<InputValueEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
unsafe extern "C" fn mgr_device_matching_cb(
context: *mut c_void,
result: ffi::IOReturn,
_sender: *mut c_void,
device: ffi::IOHIDDeviceRef,
) {
if context.is_null() || device.is_null() || result != ffi::kIOReturnSuccess {
return;
}
let device = HidDevice::from_raw_retained(device);
let tx = unsafe { &*context.cast::<AsyncStreamSender<DeviceMatchingEvent>>() };
tx.push(DeviceMatchingEvent { device });
}
pub struct ManagerDeviceMatchingStream {
inner: BoundedAsyncStream<DeviceMatchingEvent>,
_guard: RunLoopThreadGuard,
}
unsafe impl Send for ManagerDeviceMatchingStream {}
unsafe impl Sync for ManagerDeviceMatchingStream {}
impl ManagerDeviceMatchingStream {
#[must_use]
pub fn subscribe(manager: &HidManager, capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender)) as usize;
let manager_raw = manager.as_ptr() as usize;
unsafe { ffi::CFRetain((manager_raw as ffi::IOHIDManagerRef).cast_const()) };
let (tx, rx) = mpsc::channel::<SendableRunLoop>();
let thread = thread::spawn(move || {
let manager_raw = manager_raw as ffi::IOHIDManagerRef;
let sender_ptr = sender_ptr as *mut AsyncStreamSender<DeviceMatchingEvent>;
let run_loop = unsafe { ffi::CFRunLoopGetCurrent() };
unsafe {
ffi::IOHIDManagerScheduleWithRunLoop(
manager_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::IOHIDManagerRegisterDeviceMatchingCallback(
manager_raw,
Some(mgr_device_matching_cb),
sender_ptr.cast(),
);
}
tx.send(SendableRunLoop(run_loop)).ok();
unsafe { ffi::CFRunLoopRun() };
unsafe {
ffi::IOHIDManagerRegisterDeviceMatchingCallback(
manager_raw,
None,
ptr::null_mut(),
);
ffi::IOHIDManagerUnscheduleFromRunLoop(
manager_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::CFRelease(manager_raw.cast_const());
drop(Box::from_raw(sender_ptr));
}
});
let run_loop = rx.recv().map_or(ptr::null_mut(), |s| s.0);
Self {
inner: stream,
_guard: RunLoopThreadGuard {
run_loop,
thread: Some(thread),
},
}
}
#[must_use]
pub const fn next(&self) -> NextItem<'_, DeviceMatchingEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<DeviceMatchingEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
unsafe extern "C" fn mgr_device_removal_cb(
context: *mut c_void,
result: ffi::IOReturn,
_sender: *mut c_void,
device: ffi::IOHIDDeviceRef,
) {
if context.is_null() || device.is_null() || result != ffi::kIOReturnSuccess {
return;
}
let device = HidDevice::from_raw_retained(device);
let tx = unsafe { &*context.cast::<AsyncStreamSender<DeviceRemovalEvent>>() };
tx.push(DeviceRemovalEvent { device });
}
pub struct ManagerDeviceRemovalStream {
inner: BoundedAsyncStream<DeviceRemovalEvent>,
_guard: RunLoopThreadGuard,
}
unsafe impl Send for ManagerDeviceRemovalStream {}
unsafe impl Sync for ManagerDeviceRemovalStream {}
impl ManagerDeviceRemovalStream {
#[must_use]
pub fn subscribe(manager: &HidManager, capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender)) as usize;
let manager_raw = manager.as_ptr() as usize;
unsafe { ffi::CFRetain((manager_raw as ffi::IOHIDManagerRef).cast_const()) };
let (tx, rx) = mpsc::channel::<SendableRunLoop>();
let thread = thread::spawn(move || {
let manager_raw = manager_raw as ffi::IOHIDManagerRef;
let sender_ptr = sender_ptr as *mut AsyncStreamSender<DeviceRemovalEvent>;
let run_loop = unsafe { ffi::CFRunLoopGetCurrent() };
unsafe {
ffi::IOHIDManagerScheduleWithRunLoop(
manager_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::IOHIDManagerRegisterDeviceRemovalCallback(
manager_raw,
Some(mgr_device_removal_cb),
sender_ptr.cast(),
);
}
tx.send(SendableRunLoop(run_loop)).ok();
unsafe { ffi::CFRunLoopRun() };
unsafe {
ffi::IOHIDManagerRegisterDeviceRemovalCallback(
manager_raw,
None,
ptr::null_mut(),
);
ffi::IOHIDManagerUnscheduleFromRunLoop(
manager_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::CFRelease(manager_raw.cast_const());
drop(Box::from_raw(sender_ptr));
}
});
let run_loop = rx.recv().map_or(ptr::null_mut(), |s| s.0);
Self {
inner: stream,
_guard: RunLoopThreadGuard {
run_loop,
thread: Some(thread),
},
}
}
#[must_use]
pub const fn next(&self) -> NextItem<'_, DeviceRemovalEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<DeviceRemovalEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
unsafe extern "C" fn mgr_input_report_cb(
context: *mut c_void,
result: ffi::IOReturn,
sender: *mut c_void, report_type: ffi::IOHIDReportType,
report_id: u32,
report: *mut u8,
report_length: ffi::CFIndex,
) {
if context.is_null()
|| sender.is_null()
|| report.is_null()
|| result != ffi::kIOReturnSuccess
{
return;
}
let length = usize::try_from(report_length).unwrap_or(0);
let bytes = unsafe { core::slice::from_raw_parts(report.cast_const(), length) }.to_vec();
let device = HidDevice::from_raw_retained(sender.cast());
let tx = unsafe { &*context.cast::<AsyncStreamSender<InputReportEvent>>() };
tx.push(InputReportEvent {
device,
report: HidInputReport {
report_type: HidReportType::from_raw(report_type),
report_id,
bytes,
timestamp: 0,
},
});
}
pub struct ManagerInputReportStream {
inner: BoundedAsyncStream<InputReportEvent>,
_guard: RunLoopThreadGuard,
}
unsafe impl Send for ManagerInputReportStream {}
unsafe impl Sync for ManagerInputReportStream {}
impl ManagerInputReportStream {
#[must_use]
pub fn subscribe(manager: &HidManager, capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender)) as usize;
let manager_raw = manager.as_ptr() as usize;
unsafe { ffi::CFRetain((manager_raw as ffi::IOHIDManagerRef).cast_const()) };
let (tx, rx) = mpsc::channel::<SendableRunLoop>();
let thread = thread::spawn(move || {
let manager_raw = manager_raw as ffi::IOHIDManagerRef;
let sender_ptr = sender_ptr as *mut AsyncStreamSender<InputReportEvent>;
let run_loop = unsafe { ffi::CFRunLoopGetCurrent() };
unsafe {
ffi::IOHIDManagerScheduleWithRunLoop(
manager_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::IOHIDManagerRegisterInputReportCallback(
manager_raw,
Some(mgr_input_report_cb),
sender_ptr.cast(),
);
}
tx.send(SendableRunLoop(run_loop)).ok();
unsafe { ffi::CFRunLoopRun() };
unsafe {
ffi::IOHIDManagerRegisterInputReportCallback(
manager_raw,
None,
ptr::null_mut(),
);
ffi::IOHIDManagerUnscheduleFromRunLoop(
manager_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::CFRelease(manager_raw.cast_const());
drop(Box::from_raw(sender_ptr));
}
});
let run_loop = rx.recv().map_or(ptr::null_mut(), |s| s.0);
Self {
inner: stream,
_guard: RunLoopThreadGuard {
run_loop,
thread: Some(thread),
},
}
}
#[must_use]
pub const fn next(&self) -> NextItem<'_, InputReportEvent> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<InputReportEvent> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
unsafe extern "C" fn dev_removal_cb(
context: *mut c_void,
result: ffi::IOReturn,
_sender: *mut c_void,
) {
if context.is_null() || result != ffi::kIOReturnSuccess {
return;
}
let tx = unsafe { &*context.cast::<AsyncStreamSender<()>>() };
tx.push(());
}
pub struct DeviceRemovalStream {
inner: BoundedAsyncStream<()>,
_guard: RunLoopThreadGuard,
}
unsafe impl Send for DeviceRemovalStream {}
unsafe impl Sync for DeviceRemovalStream {}
impl DeviceRemovalStream {
#[must_use]
pub fn subscribe(device: &HidDevice, capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender)) as usize;
let device_raw = device.as_ptr() as usize;
unsafe { ffi::CFRetain((device_raw as ffi::IOHIDDeviceRef).cast_const()) };
let (tx, rx) = mpsc::channel::<SendableRunLoop>();
let thread = thread::spawn(move || {
let device_raw = device_raw as ffi::IOHIDDeviceRef;
let sender_ptr = sender_ptr as *mut AsyncStreamSender<()>;
let open_ok = unsafe {
ffi::IOHIDDeviceOpen(device_raw, ffi::kIOHIDOptionsTypeNone)
== ffi::kIOReturnSuccess
};
let run_loop = unsafe { ffi::CFRunLoopGetCurrent() };
if open_ok {
unsafe {
ffi::IOHIDDeviceScheduleWithRunLoop(
device_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::IOHIDDeviceRegisterRemovalCallback(
device_raw,
Some(dev_removal_cb),
sender_ptr.cast(),
);
}
}
tx.send(SendableRunLoop(run_loop)).ok();
if open_ok {
unsafe { ffi::CFRunLoopRun() };
}
unsafe {
if open_ok {
ffi::IOHIDDeviceRegisterRemovalCallback(device_raw, None, ptr::null_mut());
ffi::IOHIDDeviceUnscheduleFromRunLoop(
device_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
let _ = ffi::IOHIDDeviceClose(device_raw, ffi::kIOHIDOptionsTypeNone);
}
ffi::CFRelease(device_raw.cast_const());
drop(Box::from_raw(sender_ptr));
}
});
let run_loop = rx.recv().map_or(ptr::null_mut(), |s| s.0);
Self {
inner: stream,
_guard: RunLoopThreadGuard {
run_loop,
thread: Some(thread),
},
}
}
#[must_use]
pub const fn next(&self) -> NextItem<'_, ()> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<()> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
unsafe extern "C" fn dev_input_value_cb(
context: *mut c_void,
result: ffi::IOReturn,
_sender: *mut c_void,
value: ffi::IOHIDValueRef,
) {
if context.is_null() || value.is_null() || result != ffi::kIOReturnSuccess {
return;
}
let Some(value) = clone_value_ref(value) else {
return;
};
let tx = unsafe { &*context.cast::<AsyncStreamSender<HidValue>>() };
tx.push(value);
}
pub struct DeviceInputValueStream {
inner: BoundedAsyncStream<HidValue>,
_guard: RunLoopThreadGuard,
}
unsafe impl Send for DeviceInputValueStream {}
unsafe impl Sync for DeviceInputValueStream {}
impl DeviceInputValueStream {
#[must_use]
pub fn subscribe(device: &HidDevice, capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender)) as usize;
let device_raw = device.as_ptr() as usize;
unsafe { ffi::CFRetain((device_raw as ffi::IOHIDDeviceRef).cast_const()) };
let (tx, rx) = mpsc::channel::<SendableRunLoop>();
let thread = thread::spawn(move || {
let device_raw = device_raw as ffi::IOHIDDeviceRef;
let sender_ptr = sender_ptr as *mut AsyncStreamSender<HidValue>;
let open_ok = unsafe {
ffi::IOHIDDeviceOpen(device_raw, ffi::kIOHIDOptionsTypeNone)
== ffi::kIOReturnSuccess
};
let run_loop = unsafe { ffi::CFRunLoopGetCurrent() };
if open_ok {
unsafe {
ffi::IOHIDDeviceScheduleWithRunLoop(
device_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::IOHIDDeviceRegisterInputValueCallback(
device_raw,
Some(dev_input_value_cb),
sender_ptr.cast(),
);
}
}
tx.send(SendableRunLoop(run_loop)).ok();
if open_ok {
unsafe { ffi::CFRunLoopRun() };
}
unsafe {
if open_ok {
ffi::IOHIDDeviceRegisterInputValueCallback(device_raw, None, ptr::null_mut());
ffi::IOHIDDeviceUnscheduleFromRunLoop(
device_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
let _ = ffi::IOHIDDeviceClose(device_raw, ffi::kIOHIDOptionsTypeNone);
}
ffi::CFRelease(device_raw.cast_const());
drop(Box::from_raw(sender_ptr));
}
});
let run_loop = rx.recv().map_or(ptr::null_mut(), |s| s.0);
Self {
inner: stream,
_guard: RunLoopThreadGuard {
run_loop,
thread: Some(thread),
},
}
}
#[must_use]
pub const fn next(&self) -> NextItem<'_, HidValue> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<HidValue> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
unsafe extern "C" fn queue_value_available_cb(
context: *mut c_void,
result: ffi::IOReturn,
_sender: *mut c_void,
) {
if context.is_null() || result != ffi::kIOReturnSuccess {
return;
}
let tx = unsafe { &*context.cast::<AsyncStreamSender<()>>() };
tx.push(());
}
pub struct QueueValueStream {
inner: BoundedAsyncStream<()>,
_guard: RunLoopThreadGuard,
}
unsafe impl Send for QueueValueStream {}
unsafe impl Sync for QueueValueStream {}
impl QueueValueStream {
#[must_use]
pub fn subscribe(queue: &HidQueue, capacity: usize) -> Self {
let (stream, sender) = BoundedAsyncStream::new(capacity);
let sender_ptr = Box::into_raw(Box::new(sender)) as usize;
let queue_raw = queue.as_ptr() as usize;
unsafe { ffi::CFRetain((queue_raw as ffi::IOHIDQueueRef).cast_const()) };
let (tx, rx) = mpsc::channel::<SendableRunLoop>();
let thread = thread::spawn(move || {
let queue_raw = queue_raw as ffi::IOHIDQueueRef;
let sender_ptr = sender_ptr as *mut AsyncStreamSender<()>;
let run_loop = unsafe { ffi::CFRunLoopGetCurrent() };
unsafe {
ffi::IOHIDQueueScheduleWithRunLoop(
queue_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::IOHIDQueueRegisterValueAvailableCallback(
queue_raw,
Some(queue_value_available_cb),
sender_ptr.cast(),
);
}
tx.send(SendableRunLoop(run_loop)).ok();
unsafe { ffi::CFRunLoopRun() };
unsafe {
ffi::IOHIDQueueRegisterValueAvailableCallback(queue_raw, None, ptr::null_mut());
ffi::IOHIDQueueUnscheduleFromRunLoop(
queue_raw,
run_loop,
ffi::kCFRunLoopDefaultMode,
);
ffi::CFRelease(queue_raw.cast_const());
drop(Box::from_raw(sender_ptr));
}
});
let run_loop = rx.recv().map_or(ptr::null_mut(), |s| s.0);
Self {
inner: stream,
_guard: RunLoopThreadGuard {
run_loop,
thread: Some(thread),
},
}
}
#[must_use]
pub const fn next(&self) -> NextItem<'_, ()> {
self.inner.next()
}
#[must_use]
pub fn try_next(&self) -> Option<()> {
self.inner.try_next()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.inner.buffered_count()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}