use polling::Poller;
use std::{
borrow,
marker::PhantomData,
ops,
os::unix::io::{AsFd, AsRawFd, BorrowedFd},
sync::Arc,
};
use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
#[derive(Debug)]
pub struct FdWrapper<T: AsRawFd>(T);
impl<T: AsRawFd> FdWrapper<T> {
pub unsafe fn new(inner: T) -> Self {
Self(inner)
}
}
impl<T: AsRawFd> ops::Deref for FdWrapper<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: AsRawFd> ops::DerefMut for FdWrapper<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: AsRawFd> AsFd for FdWrapper<T> {
fn as_fd(&self) -> BorrowedFd {
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
}
}
#[derive(Debug)]
pub struct NoIoDrop<T>(T);
impl<T> NoIoDrop<T> {
pub unsafe fn get_mut(&mut self) -> &mut T {
&mut self.0
}
}
impl<T> AsRef<T> for NoIoDrop<T> {
fn as_ref(&self) -> &T {
&self.0
}
}
impl<T> borrow::Borrow<T> for NoIoDrop<T> {
fn borrow(&self) -> &T {
&self.0
}
}
impl<T> ops::Deref for NoIoDrop<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: AsFd> AsFd for NoIoDrop<T> {
fn as_fd(&self) -> BorrowedFd<'_> {
self.0.as_fd()
}
}
#[derive(Debug)]
pub struct Generic<F: AsFd, E = std::io::Error> {
file: Option<NoIoDrop<F>>,
pub interest: Interest,
pub mode: Mode,
poller: Option<Arc<Poller>>,
token: Option<Token>,
_error_type: PhantomData<E>,
}
impl<F: AsFd> Generic<F, std::io::Error> {
pub fn new(file: F, interest: Interest, mode: Mode) -> Generic<F, std::io::Error> {
Generic {
file: Some(NoIoDrop(file)),
interest,
mode,
token: None,
poller: None,
_error_type: PhantomData,
}
}
pub fn new_with_error<E>(file: F, interest: Interest, mode: Mode) -> Generic<F, E> {
Generic {
file: Some(NoIoDrop(file)),
interest,
mode,
token: None,
poller: None,
_error_type: PhantomData,
}
}
}
impl<F: AsFd, E> Generic<F, E> {
pub fn unwrap(mut self) -> F {
let NoIoDrop(file) = self.file.take().unwrap();
if let Some(poller) = self.poller.take() {
poller.delete(file.as_fd()).ok();
}
file
}
pub fn get_ref(&self) -> &F {
&self.file.as_ref().unwrap().0
}
pub unsafe fn get_mut(&mut self) -> &mut F {
self.file.as_mut().unwrap().get_mut()
}
}
impl<F: AsFd, E> Drop for Generic<F, E> {
fn drop(&mut self) {
if let (Some(file), Some(poller)) = (self.file.take(), self.poller.take()) {
poller.delete(file.as_fd()).ok();
}
}
}
impl<F, E> EventSource for Generic<F, E>
where
F: AsFd,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Event = Readiness;
type Metadata = NoIoDrop<F>;
type Ret = Result<PostAction, E>;
type Error = E;
fn process_events<C>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: C,
) -> Result<PostAction, Self::Error>
where
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
if self.token != Some(token) {
return Ok(PostAction::Continue);
}
callback(readiness, self.file.as_mut().unwrap())
}
fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
let token = token_factory.token();
unsafe {
poll.register(
&self.file.as_ref().unwrap().0,
self.interest,
self.mode,
token,
)?;
}
self.poller = Some(poll.poller().clone());
self.token = Some(token);
Ok(())
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
let token = token_factory.token();
poll.reregister(
&self.file.as_ref().unwrap().0,
self.interest,
self.mode,
token,
)?;
self.token = Some(token);
Ok(())
}
fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
poll.unregister(&self.file.as_ref().unwrap().0)?;
self.poller = None;
self.token = None;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use super::Generic;
use crate::{Dispatcher, Interest, Mode, PostAction};
#[cfg(unix)]
#[test]
fn dispatch_unix() {
use std::os::unix::net::UnixStream;
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let (mut tx, rx) = UnixStream::pair().unwrap();
let generic = Generic::new(rx, Interest::READ, Mode::Level);
let mut dispached = false;
let _generic_token = handle
.insert_source(generic, move |readiness, file, d| {
assert!(readiness.readable);
assert!(!readiness.writable);
let mut buffer = vec![0; 10];
let ret = (&**file).read(&mut buffer).unwrap();
assert_eq!(ret, 6);
assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]);
*d = true;
Ok(PostAction::Continue)
})
.unwrap();
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
.unwrap();
assert!(!dispached);
let ret = tx.write(&[1, 2, 3, 4, 5, 6]).unwrap();
assert_eq!(ret, 6);
tx.flush().unwrap();
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
.unwrap();
assert!(dispached);
}
#[test]
fn register_deregister_unix() {
use std::os::unix::net::UnixStream;
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let (mut tx, rx) = UnixStream::pair().unwrap();
let generic = Generic::new(rx, Interest::READ, Mode::Level);
let dispatcher = Dispatcher::new(generic, move |_, _, d| {
*d = true;
Ok(PostAction::Continue)
});
let mut dispached = false;
let generic_token = handle.register_dispatcher(dispatcher.clone()).unwrap();
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
.unwrap();
assert!(!dispached);
event_loop.handle().remove(generic_token);
let ret = tx.write(&[1, 2, 3, 4, 5, 6]).unwrap();
assert_eq!(ret, 6);
tx.flush().unwrap();
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
.unwrap();
assert!(!dispached);
let generic = dispatcher.into_source_inner();
let _generic_token = handle
.insert_source(generic, move |readiness, file, d| {
assert!(readiness.readable);
assert!(!readiness.writable);
let mut buffer = vec![0; 10];
let ret = (&**file).read(&mut buffer).unwrap();
assert_eq!(ret, 6);
assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]);
*d = true;
Ok(PostAction::Continue)
})
.unwrap();
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
.unwrap();
assert!(dispached);
}
#[cfg(target_os = "linux")]
#[test]
fn duplicate_insert() {
use std::os::unix::{
io::{AsFd, BorrowedFd},
net::UnixStream,
};
let event_loop = crate::EventLoop::<()>::try_new().unwrap();
let handle = event_loop.handle();
let (_, rx) = UnixStream::pair().unwrap();
struct RcFd<T> {
rc: std::rc::Rc<T>,
}
impl<T: AsFd> AsFd for RcFd<T> {
fn as_fd(&self) -> BorrowedFd<'_> {
self.rc.as_fd()
}
}
let rx = std::rc::Rc::new(rx);
let token = handle
.insert_source(
Generic::new(RcFd { rc: rx.clone() }, Interest::READ, Mode::Level),
|_, _, _| Ok(PostAction::Continue),
)
.unwrap();
let ret = handle.insert_source(
Generic::new(RcFd { rc: rx.clone() }, Interest::READ, Mode::Level),
|_, _, _| Ok(PostAction::Continue),
);
assert!(ret.is_err());
std::mem::drop(ret);
handle.update(&token).unwrap();
}
}