use crate::watcher::{Watcher, EPOLLIN};
use std::convert::TryInto;
use std::mem;
use std::mem::MaybeUninit;
use std::os::raw;
use std::sync::Once;
use std::sync::{Condvar, Mutex};
use std::task::Waker;
use std::thread;
use std::os::unix::io::RawFd;
const EPOLL_CTL_ADD: i32 = 1;
const EPOLL_CTL_DEL: i32 = 2;
const O_CLOEXEC: raw::c_int = 0x0008_0000;
#[repr(C)]
#[derive(Copy, Clone)]
union EpollData {
ptr: *mut raw::c_void,
fd: RawFd,
uint32: u32,
uint64: u64,
}
#[repr(packed, C)]
#[derive(Copy, Clone)]
struct EpollEvent {
events: u32,
data: EpollData,
}
#[allow(non_camel_case_types)]
type c_ssize = isize;
#[allow(non_camel_case_types)]
type c_size = usize;
extern "C" {
fn epoll_create1(flags: raw::c_int) -> raw::c_int;
fn epoll_ctl(
epfd: RawFd,
op: raw::c_int,
fd: RawFd,
event: *mut EpollEvent,
) -> raw::c_int;
fn epoll_wait(
epfd: RawFd,
events: *mut EpollEvent,
maxevents: raw::c_int,
timeout: raw::c_int,
) -> raw::c_int;
fn pipe2(pipefd: *mut [RawFd; 2], flags: raw::c_int) -> raw::c_int;
fn write(fd: RawFd, buf: *const raw::c_void, count: c_size) -> c_ssize;
fn read(fd: RawFd, buf: *mut raw::c_void, count: c_size) -> c_ssize;
}
static INIT: Once = Once::new();
static mut SHARED_CONTEXT: SharedContext = SharedContext::new();
#[derive(Debug)]
struct DeviceID(u64);
#[derive(Debug)]
enum Message {
Device(DeviceID, RawFd, Watcher),
Waker(DeviceID, Waker),
Disconnect(RawFd, *const (Mutex<bool>, Condvar)),
}
fn hardware_thread(recver: RawFd) {
let mut wakers: Vec<Option<Waker>> = vec![None];
let epoll_fd = unsafe { epoll_create1(O_CLOEXEC) };
error(epoll_fd).unwrap();
unsafe {
error(epoll_ctl(
epoll_fd,
EPOLL_CTL_ADD,
recver,
&mut EpollEvent {
events: EPOLLIN,
data: EpollData { uint64: 0 },
},
))
.unwrap();
}
loop {
let mut ev = MaybeUninit::<EpollEvent>::uninit();
if unsafe {
epoll_wait(
epoll_fd,
ev.as_mut_ptr(),
1,
-1,
)
} < 0
{
continue;
}
let index = DeviceID(unsafe { ev.assume_init().data.uint64 });
if index.0 == 0 {
let mut buffer = mem::MaybeUninit::<Message>::uninit();
let len = unsafe {
read(
recver,
buffer.as_mut_ptr().cast(),
mem::size_of::<Message>(),
)
};
let message = unsafe { buffer.assume_init() };
assert_eq!(len as usize, mem::size_of::<Message>());
match message {
Message::Device(device_id, device_fd, events) => {
let index: usize = device_id.0.try_into().unwrap();
wakers.resize(wakers.len().max(index), None);
unsafe {
error(epoll_ctl(
epoll_fd,
EPOLL_CTL_ADD,
device_fd,
&mut EpollEvent {
events: events.0,
data: EpollData {
uint64: device_id.0,
},
},
))
.unwrap();
}
}
Message::Waker(device_id, waker) => {
let index: usize = device_id.0.try_into().unwrap();
wakers[index - 1] = Some(waker);
}
Message::Disconnect(device_fd, pair) => unsafe {
error(epoll_ctl(
epoll_fd,
EPOLL_CTL_DEL,
device_fd,
&mut EpollEvent {
events: 0,
data: EpollData { uint64: 0 },
},
))
.unwrap();
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
},
}
continue;
}
let id: usize = index.0.try_into().unwrap();
if let Some(waker) = wakers[id - 1].take() {
waker.wake();
}
}
}
fn error(err: raw::c_int) -> Result<(), raw::c_int> {
if err < 0 {
Err(err)
} else {
Ok(())
}
}
fn context() -> &'static mut Mutex<Context> {
unsafe { &mut *SHARED_CONTEXT.0.as_mut_ptr() }
}
struct Context {
next: DeviceID,
garbage: Vec<DeviceID>,
sender: RawFd,
}
impl Context {
fn new(sender: RawFd) -> Self {
Context {
next: DeviceID(1),
garbage: Vec::new(),
sender,
}
}
fn create_id(&mut self) -> DeviceID {
if let Some(id) = self.garbage.pop() {
id
} else {
let ret = DeviceID(self.next.0);
self.next.0 += 1;
ret
}
}
fn delete_id(&mut self, device_id: DeviceID) {
if device_id.0 == self.next.0 - 1 {
self.next.0 -= 1;
} else {
self.garbage.push(device_id);
}
}
}
struct SharedContext(MaybeUninit<Mutex<Context>>);
impl SharedContext {
const fn new() -> Self {
SharedContext(MaybeUninit::uninit())
}
fn init(&mut self, context: Mutex<Context>) {
self.0 = MaybeUninit::new(context);
}
}
#[derive(Debug)]
pub struct Device {
fd: RawFd,
device_id: DeviceID,
old: bool,
}
impl Device {
pub fn new(fd: RawFd, events: Watcher) -> Self {
INIT.call_once(|| {
let mut pipe = MaybeUninit::<[RawFd; 2]>::uninit();
error(unsafe { pipe2(pipe.as_mut_ptr(), O_CLOEXEC) }).unwrap();
let [recver, sender] = unsafe { pipe.assume_init() };
unsafe { SHARED_CONTEXT.init(Mutex::new(Context::new(sender))) }
let _join = thread::spawn(move || hardware_thread(recver));
});
let mut context = context().lock().unwrap();
let device_id = context.create_id();
let write_fd = context.sender;
let message = [Message::Device(DeviceID(device_id.0), fd, events)];
unsafe {
if write(write_fd, message.as_ptr().cast(), mem::size_of::<Message>()) as usize
!= mem::size_of::<Message>()
{
panic!("Failed write to pipe");
}
}
let old = false;
Device { fd, device_id, old }
}
pub fn register_waker(&self, waker: &Waker) {
assert_eq!(self.old, false);
let context = context().lock().unwrap();
let write_fd = context.sender;
let message = [Message::Waker(DeviceID(self.device_id.0), waker.clone())];
unsafe {
if write(write_fd, message.as_ptr().cast(), mem::size_of::<Message>()) as usize
!= mem::size_of::<Message>()
{
panic!("Failed write to pipe");
}
}
}
pub fn fd(&self) -> RawFd {
self.fd
}
#[allow(clippy::mutex_atomic)]
pub fn old(&mut self) {
if self.old {
return;
}
self.old = true;
let mut context = context().lock().unwrap();
let write_fd = context.sender;
let pair = Box::pin((Mutex::new(false), Condvar::new()));
let message = [Message::Disconnect(self.fd, &*pair)];
unsafe {
if write(write_fd, message.as_ptr().cast(), mem::size_of::<Message>()) as usize
!= mem::size_of::<Message>()
{
panic!("Failed write to pipe");
}
}
context.delete_id(DeviceID(self.device_id.0));
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
}
}
impl Drop for Device {
fn drop(&mut self) {
self.old();
}
}