use std::cell::{Cell, RefCell};
use std::fmt::Debug;
use std::os::unix::io::AsFd;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{io, slice};
#[cfg(feature = "block_on")]
use std::future::Future;
use slab::Slab;
use crate::sources::{Dispatcher, EventSource, Idle, IdleDispatcher};
use crate::sys::{Notifier, PollEvent};
use crate::{
AdditionalLifecycleEventsSet, EventDispatcher, InsertError, Poll, PostAction, Readiness, Token,
TokenFactory,
};
type IdleCallback<'i, Data> = Rc<RefCell<dyn IdleDispatcher<Data> + 'i>>;
#[cfg(target_pointer_width = "64")]
pub(crate) const MAX_SOURCES: u32 = 44;
#[cfg(target_pointer_width = "32")]
pub(crate) const MAX_SOURCES: u32 = 22;
#[cfg(target_pointer_width = "16")]
pub(crate) const MAX_SOURCES: u32 = 10;
#[cfg(target_pointer_width = "64")]
pub(crate) const MAX_SUBSOURCES: u32 = 20;
#[cfg(target_pointer_width = "32")]
pub(crate) const MAX_SUBSOURCES: u32 = 10;
#[cfg(target_pointer_width = "16")]
pub(crate) const MAX_SUBSOURCES: u32 = 6;
pub(crate) const MAX_SOURCES_TOTAL: usize = 1 << MAX_SOURCES;
pub(crate) const MAX_SUBSOURCES_TOTAL: usize = 1 << MAX_SUBSOURCES;
pub(crate) const MAX_SOURCES_MASK: usize = MAX_SOURCES_TOTAL - 1;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RegistrationToken {
key: usize,
}
impl RegistrationToken {
pub(crate) fn new(key: usize) -> Self {
Self { key }
}
}
pub(crate) struct LoopInner<'l, Data> {
pub(crate) poll: RefCell<Poll>,
pub(crate) sources: RefCell<Slab<Rc<dyn EventDispatcher<Data> + 'l>>>,
pub(crate) sources_with_additional_lifecycle_events: RefCell<AdditionalLifecycleEventsSet>,
idles: RefCell<Vec<IdleCallback<'l, Data>>>,
pending_action: Cell<PostAction>,
}
pub struct LoopHandle<'l, Data> {
inner: Rc<LoopInner<'l, Data>>,
}
impl<'l, Data> std::fmt::Debug for LoopHandle<'l, Data> {
#[cfg_attr(feature = "nightly_coverage", coverage(off))]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("LoopHandle { ... }")
}
}
impl<'l, Data> Clone for LoopHandle<'l, Data> {
#[cfg_attr(feature = "nightly_coverage", coverage(off))]
fn clone(&self) -> Self {
LoopHandle {
inner: self.inner.clone(),
}
}
}
impl<'l, Data> LoopHandle<'l, Data> {
pub fn insert_source<S, F>(
&self,
source: S,
callback: F,
) -> Result<RegistrationToken, InsertError<S>>
where
S: EventSource + 'l,
F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'l,
{
let dispatcher = Dispatcher::new(source, callback);
self.register_dispatcher(dispatcher.clone())
.map_err(|error| InsertError {
error,
inserted: dispatcher.into_source_inner(),
})
}
#[cfg_attr(feature = "nightly_coverage", coverage(off))] pub fn register_dispatcher<S>(
&self,
dispatcher: Dispatcher<'l, S, Data>,
) -> crate::Result<RegistrationToken>
where
S: EventSource + 'l,
{
let mut sources = self.inner.sources.borrow_mut();
let mut poll = self.inner.poll.borrow_mut();
if sources.vacant_key() >= MAX_SOURCES_TOTAL {
return Err(crate::Error::IoError(std::io::Error::new(
std::io::ErrorKind::Other,
"Too many sources",
)));
}
let key = sources.insert(dispatcher.clone_as_event_dispatcher());
let ret = sources.get(key).unwrap().register(
&mut poll,
&mut self
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
&mut TokenFactory::new(key),
);
if let Err(error) = ret {
sources.try_remove(key).expect("Source was just inserted?!");
return Err(error);
}
Ok(RegistrationToken { key })
}
pub fn insert_idle<'i, F: FnOnce(&mut Data) + 'l + 'i>(&self, callback: F) -> Idle<'i> {
let mut opt_cb = Some(callback);
let callback = Rc::new(RefCell::new(Some(move |data: &mut Data| {
if let Some(cb) = opt_cb.take() {
cb(data);
}
})));
self.inner.idles.borrow_mut().push(callback.clone());
Idle { callback }
}
pub fn enable(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
source.register(
&mut self.inner.poll.borrow_mut(),
&mut self
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
&mut TokenFactory::new(token.key),
)?;
}
Ok(())
}
pub fn update(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
if !source.reregister(
&mut self.inner.poll.borrow_mut(),
&mut self
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
&mut TokenFactory::new(token.key),
)? {
self.inner.pending_action.set(PostAction::Reregister);
}
}
Ok(())
}
pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
if !source.unregister(
&mut self.inner.poll.borrow_mut(),
&mut self
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
*token,
)? {
self.inner.pending_action.set(PostAction::Disable);
}
}
Ok(())
}
pub fn remove(&self, token: RegistrationToken) {
if let Some(source) = self.inner.sources.borrow_mut().try_remove(token.key) {
if let Err(e) = source.unregister(
&mut self.inner.poll.borrow_mut(),
&mut self
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
token,
) {
log::warn!(
"[calloop] Failed to unregister source from the polling system: {:?}",
e
);
}
}
}
pub fn adapt_io<F: AsFd>(&self, fd: F) -> crate::Result<crate::io::Async<'l, F>> {
crate::io::Async::new(self.inner.clone(), fd)
}
}
pub struct EventLoop<'l, Data> {
handle: LoopHandle<'l, Data>,
signals: Arc<Signals>,
synthetic_events: Vec<PollEvent>,
}
impl<'l, Data> std::fmt::Debug for EventLoop<'l, Data> {
#[cfg_attr(feature = "nightly_coverage", coverage(off))]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("EventLoop { ... }")
}
}
struct Signals {
stop: AtomicBool,
#[cfg(feature = "block_on")]
future_ready: AtomicBool,
}
impl<'l, Data> EventLoop<'l, Data> {
pub fn try_new() -> crate::Result<Self> {
let poll = Poll::new()?;
let handle = LoopHandle {
inner: Rc::new(LoopInner {
poll: RefCell::new(poll),
sources: RefCell::new(Slab::new()),
idles: RefCell::new(Vec::new()),
pending_action: Cell::new(PostAction::Continue),
sources_with_additional_lifecycle_events: Default::default(),
}),
};
Ok(EventLoop {
handle,
signals: Arc::new(Signals {
stop: AtomicBool::new(false),
#[cfg(feature = "block_on")]
future_ready: AtomicBool::new(false),
}),
synthetic_events: vec![],
})
}
pub fn handle(&self) -> LoopHandle<'l, Data> {
self.handle.clone()
}
fn dispatch_events(
&mut self,
mut timeout: Option<Duration>,
data: &mut Data,
) -> crate::Result<()> {
let now = Instant::now();
{
let mut extra_lifecycle_sources = self
.handle
.inner
.sources_with_additional_lifecycle_events
.borrow_mut();
let sources = &self.handle.inner.sources.borrow();
for source in &mut *extra_lifecycle_sources.values {
if let Some(disp) = sources.get(source.key) {
if let Some((readiness, token)) = disp.before_sleep()? {
timeout = Some(Duration::ZERO);
self.synthetic_events.push(PollEvent { readiness, token });
}
} else {
unreachable!()
}
}
}
let events = {
let poll = self.handle.inner.poll.borrow();
loop {
let result = poll.poll(timeout);
match result {
Ok(events) => break events,
Err(crate::Error::IoError(err)) if err.kind() == io::ErrorKind::Interrupted => {
if let Some(to) = timeout {
let elapsed = now.elapsed();
if elapsed >= to {
return Ok(());
} else {
timeout = Some(to - elapsed);
}
}
}
Err(err) => return Err(err),
};
}
};
{
let mut extra_lifecycle_sources = self
.handle
.inner
.sources_with_additional_lifecycle_events
.borrow_mut();
if !extra_lifecycle_sources.values.is_empty() {
for source in &mut *extra_lifecycle_sources.values {
if let Some(disp) = self.handle.inner.sources.borrow().get(source.key) {
let iter = EventIterator {
inner: events.iter(),
registration_token: *source,
};
disp.before_handle_events(iter);
} else {
unreachable!()
}
}
}
}
for event in self.synthetic_events.drain(..).chain(events) {
let registroken_token = event.token.key & MAX_SOURCES_MASK;
let opt_disp = self
.handle
.inner
.sources
.borrow()
.get(registroken_token)
.cloned();
if let Some(disp) = opt_disp {
let mut ret = disp.process_events(event.readiness, event.token, data)?;
let pending_action = self
.handle
.inner
.pending_action
.replace(PostAction::Continue);
if let PostAction::Continue = ret {
ret = pending_action;
}
match ret {
PostAction::Reregister => {
disp.reregister(
&mut self.handle.inner.poll.borrow_mut(),
&mut self
.handle
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
&mut TokenFactory::new(event.token.key),
)?;
}
PostAction::Disable => {
disp.unregister(
&mut self.handle.inner.poll.borrow_mut(),
&mut self
.handle
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
RegistrationToken::new(registroken_token),
)?;
}
PostAction::Remove => {
self.handle
.inner
.sources
.borrow_mut()
.remove(event.token.key);
}
PostAction::Continue => {}
}
if !self
.handle
.inner
.sources
.borrow()
.contains(registroken_token)
{
let mut poll = self.handle.inner.poll.borrow_mut();
if let Err(e) = disp.unregister(
&mut poll,
&mut self
.handle
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
RegistrationToken::new(registroken_token),
) {
log::warn!(
"[calloop] Failed to unregister source from the polling system: {:?}",
e
);
}
}
} else {
log::warn!(
"[calloop] Received an event for non-existence source: {:?}",
event.token.key
);
}
}
Ok(())
}
fn dispatch_idles(&mut self, data: &mut Data) {
let idles = std::mem::take(&mut *self.handle.inner.idles.borrow_mut());
for idle in idles {
idle.borrow_mut().dispatch(data);
}
}
pub fn dispatch<D: Into<Option<Duration>>>(
&mut self,
timeout: D,
data: &mut Data,
) -> crate::Result<()> {
self.dispatch_events(timeout.into(), data)?;
self.dispatch_idles(data);
Ok(())
}
pub fn get_signal(&self) -> LoopSignal {
LoopSignal {
signal: self.signals.clone(),
notifier: self.handle.inner.poll.borrow().notifier(),
}
}
pub fn run<F, D: Into<Option<Duration>>>(
&mut self,
timeout: D,
data: &mut Data,
mut cb: F,
) -> crate::Result<()>
where
F: FnMut(&mut Data),
{
let timeout = timeout.into();
self.signals.stop.store(false, Ordering::Release);
while !self.signals.stop.load(Ordering::Acquire) {
self.dispatch(timeout, data)?;
cb(data);
}
Ok(())
}
#[cfg(feature = "block_on")]
pub fn block_on<R>(
&mut self,
future: impl Future<Output = R>,
data: &mut Data,
mut cb: impl FnMut(&mut Data),
) -> crate::Result<Option<R>> {
use std::task::{Context, Poll, Wake, Waker};
struct EventLoopWaker(LoopSignal);
impl Wake for EventLoopWaker {
fn wake(self: Arc<Self>) {
self.0.signal.future_ready.store(true, Ordering::Release);
self.0.notifier.notify().ok();
}
fn wake_by_ref(self: &Arc<Self>) {
self.0.signal.future_ready.store(true, Ordering::Release);
self.0.notifier.notify().ok();
}
}
pin_utils::pin_mut!(future);
let waker = {
let handle = EventLoopWaker(self.get_signal());
Waker::from(Arc::new(handle))
};
let mut context = Context::from_waker(&waker);
let mut output = None;
self.signals.stop.store(false, Ordering::Release);
self.signals.future_ready.store(true, Ordering::Release);
while !self.signals.stop.load(Ordering::Acquire) {
if self.signals.future_ready.swap(false, Ordering::AcqRel) {
if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
output = Some(result);
break;
}
}
self.dispatch_events(None, data)?;
self.dispatch_idles(data);
cb(data);
}
Ok(output)
}
}
#[derive(Clone, Debug)]
pub struct EventIterator<'a> {
inner: slice::Iter<'a, PollEvent>,
registration_token: RegistrationToken,
}
impl<'a> Iterator for EventIterator<'a> {
type Item = (Readiness, Token);
fn next(&mut self) -> Option<Self::Item> {
for next in self.inner.by_ref() {
if next.token.key & MAX_SOURCES_MASK == self.registration_token.key {
return Some((next.readiness, next.token));
}
}
None
}
}
#[derive(Clone)]
pub struct LoopSignal {
signal: Arc<Signals>,
notifier: Notifier,
}
impl std::fmt::Debug for LoopSignal {
#[cfg_attr(feature = "nightly_coverage", coverage(off))]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("LoopSignal { ... }")
}
}
impl LoopSignal {
pub fn stop(&self) {
self.signal.stop.store(true, Ordering::Release);
}
pub fn wakeup(&self) {
self.notifier.notify().ok();
}
}
#[cfg(test)]
mod tests {
use std::{cell::Cell, rc::Rc, time::Duration};
use crate::{
channel::{channel, Channel},
generic::Generic,
ping::*,
Dispatcher, EventIterator, EventSource, Interest, Mode, Poll, PostAction, Readiness,
RegistrationToken, Token, TokenFactory,
};
use super::EventLoop;
#[test]
fn dispatch_idle() {
let mut event_loop = EventLoop::try_new().unwrap();
let mut dispatched = false;
event_loop.handle().insert_idle(|d| {
*d = true;
});
event_loop
.dispatch(Some(Duration::ZERO), &mut dispatched)
.unwrap();
assert!(dispatched);
}
#[test]
fn cancel_idle() {
let mut event_loop = EventLoop::try_new().unwrap();
let mut dispatched = false;
let handle = event_loop.handle();
let idle = handle.insert_idle(move |d| {
*d = true;
});
idle.cancel();
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);
}
#[test]
fn wakeup() {
let mut event_loop = EventLoop::try_new().unwrap();
let signal = event_loop.get_signal();
::std::thread::spawn(move || {
::std::thread::sleep(Duration::from_millis(500));
signal.wakeup();
});
event_loop.dispatch(None, &mut ()).unwrap();
}
#[test]
fn wakeup_stop() {
let mut event_loop = EventLoop::try_new().unwrap();
let signal = event_loop.get_signal();
::std::thread::spawn(move || {
::std::thread::sleep(Duration::from_millis(500));
signal.stop();
signal.wakeup();
});
event_loop.run(None, &mut (), |_| {}).unwrap();
}
#[test]
fn additional_events() {
let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
let mut lock = Lock {
lock: Rc::new((
Cell::new(false),
Cell::new(0),
Cell::new(0),
)),
};
let (sender, channel) = channel();
let token = event_loop
.handle()
.insert_source(
LockingSource {
channel,
lock: lock.clone(),
},
|_, _, lock| {
lock.lock();
lock.unlock();
},
)
.unwrap();
sender.send(()).unwrap();
event_loop.dispatch(None, &mut lock).unwrap();
assert_eq!(lock.lock.1.get(), 2);
assert_eq!(lock.lock.2.get(), 1);
event_loop.handle().disable(&token).unwrap();
event_loop
.dispatch(Some(Duration::ZERO), &mut lock)
.unwrap();
assert_eq!(lock.lock.1.get(), 2);
event_loop.handle().enable(&token).unwrap();
event_loop
.dispatch(Some(Duration::ZERO), &mut lock)
.unwrap();
assert_eq!(lock.lock.1.get(), 3);
event_loop.handle().remove(token);
event_loop
.dispatch(Some(Duration::ZERO), &mut lock)
.unwrap();
assert_eq!(lock.lock.1.get(), 3);
assert_eq!(lock.lock.2.get(), 1);
#[derive(Clone)]
struct Lock {
lock: Rc<(Cell<bool>, Cell<u32>, Cell<u32>)>,
}
impl Lock {
fn lock(&self) {
if self.lock.0.get() {
panic!();
}
self.lock.1.set(self.lock.1.get() + 1);
self.lock.0.set(true)
}
fn unlock(&self) {
if !self.lock.0.get() {
panic!();
}
self.lock.0.set(false);
}
}
struct LockingSource {
channel: Channel<()>,
lock: Lock,
}
impl EventSource for LockingSource {
type Event = <Channel<()> as EventSource>::Event;
type Metadata = <Channel<()> as EventSource>::Metadata;
type Ret = <Channel<()> as EventSource>::Ret;
type Error = <Channel<()> as EventSource>::Error;
fn process_events<F>(
&mut self,
readiness: Readiness,
token: Token,
callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.channel.process_events(readiness, token, callback)
}
fn register(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.channel.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.channel.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.channel.unregister(poll)
}
const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
self.lock.lock();
Ok(None)
}
fn before_handle_events(&mut self, events: EventIterator) {
let events_count = events.count();
let lock = &self.lock.lock;
lock.2.set(lock.2.get() + events_count as u32);
self.lock.unlock();
}
}
}
#[test]
fn default_additional_events() {
let (sender, channel) = channel();
let mut test_source = NoopWithDefaultHandlers { channel };
let mut event_loop = EventLoop::try_new().unwrap();
event_loop
.handle()
.insert_source(Box::new(&mut test_source), |_, _, _| {})
.unwrap();
sender.send(()).unwrap();
event_loop.dispatch(None, &mut ()).unwrap();
struct NoopWithDefaultHandlers {
channel: Channel<()>,
}
impl EventSource for NoopWithDefaultHandlers {
type Event = <Channel<()> as EventSource>::Event;
type Metadata = <Channel<()> as EventSource>::Metadata;
type Ret = <Channel<()> as EventSource>::Ret;
type Error = <Channel<()> as EventSource>::Error;
fn process_events<F>(
&mut self,
readiness: Readiness,
token: Token,
callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.channel.process_events(readiness, token, callback)
}
fn register(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.channel.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.channel.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.channel.unregister(poll)
}
const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
}
}
#[test]
fn additional_events_synthetic() {
let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
let mut lock = Lock {
lock: Rc::new(Cell::new(false)),
};
event_loop
.handle()
.insert_source(
InstantWakeupLockingSource {
lock: lock.clone(),
token: None,
},
|_, _, lock| {
lock.lock();
lock.unlock();
},
)
.unwrap();
event_loop.dispatch(None, &mut lock).unwrap();
#[derive(Clone)]
struct Lock {
lock: Rc<Cell<bool>>,
}
impl Lock {
fn lock(&self) {
if self.lock.get() {
panic!();
}
self.lock.set(true)
}
fn unlock(&self) {
if !self.lock.get() {
panic!();
}
self.lock.set(false);
}
}
struct InstantWakeupLockingSource {
lock: Lock,
token: Option<Token>,
}
impl EventSource for InstantWakeupLockingSource {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = <Channel<()> as EventSource>::Error;
fn process_events<F>(
&mut self,
_: Readiness,
token: Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
assert_eq!(token, self.token.unwrap());
callback((), &mut ());
Ok(PostAction::Continue)
}
fn register(
&mut self,
_: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.token = Some(token_factory.token());
Ok(())
}
fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
unreachable!()
}
fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
unreachable!()
}
const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
self.lock.lock();
Ok(Some((Readiness::EMPTY, self.token.unwrap())))
}
fn before_handle_events(&mut self, _: EventIterator) {
self.lock.unlock();
}
}
}
#[test]
fn insert_bad_source() {
use std::os::unix::io::FromRawFd;
let event_loop = EventLoop::<()>::try_new().unwrap();
let fd = unsafe { std::os::unix::io::OwnedFd::from_raw_fd(420) };
let ret = event_loop.handle().insert_source(
crate::sources::generic::Generic::new(fd, Interest::READ, Mode::Level),
|_, _, _| Ok(PostAction::Continue),
);
assert!(ret.is_err());
}
#[test]
fn insert_source_no_interest() {
use rustix::pipe::pipe;
let (read, _write) = pipe().unwrap();
let source = crate::sources::generic::Generic::new(read, Interest::EMPTY, Mode::Level);
let dispatcher = Dispatcher::new(source, |_, _, _| Ok(PostAction::Continue));
let event_loop = EventLoop::<()>::try_new().unwrap();
let handle = event_loop.handle();
let ret = handle.register_dispatcher(dispatcher.clone());
if let Ok(token) = ret {
handle.remove(token);
} else {
panic!();
}
}
#[test]
fn disarm_rearm() {
let mut event_loop = EventLoop::<bool>::try_new().unwrap();
let (ping, ping_source) = make_ping().unwrap();
let ping_token = event_loop
.handle()
.insert_source(ping_source, |(), &mut (), dispatched| {
*dispatched = true;
})
.unwrap();
ping.ping();
let mut dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);
ping.ping();
event_loop.handle().disable(&ping_token).unwrap();
let mut dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);
event_loop.handle().enable(&ping_token).unwrap();
let mut dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);
}
#[test]
fn multiple_tokens() {
struct DoubleSource {
ping1: PingSource,
ping2: PingSource,
}
impl crate::EventSource for DoubleSource {
type Event = u32;
type Metadata = ();
type Ret = ();
type Error = PingError;
fn process_events<F>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.ping1
.process_events(readiness, token, |(), &mut ()| callback(1, &mut ()))?;
self.ping2
.process_events(readiness, token, |(), &mut ()| callback(2, &mut ()))?;
Ok(PostAction::Continue)
}
fn register(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.ping1.register(poll, token_factory)?;
self.ping2.register(poll, token_factory)?;
Ok(())
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.ping1.reregister(poll, token_factory)?;
self.ping2.reregister(poll, token_factory)?;
Ok(())
}
fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.ping1.unregister(poll)?;
self.ping2.unregister(poll)?;
Ok(())
}
}
let mut event_loop = EventLoop::<u32>::try_new().unwrap();
let (ping1, source1) = make_ping().unwrap();
let (ping2, source2) = make_ping().unwrap();
let source = DoubleSource {
ping1: source1,
ping2: source2,
};
event_loop
.handle()
.insert_source(source, |i, _, d| {
eprintln!("Dispatching {}", i);
*d += i
})
.unwrap();
let mut dispatched = 0;
ping1.ping();
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert_eq!(dispatched, 1);
dispatched = 0;
ping2.ping();
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert_eq!(dispatched, 2);
dispatched = 0;
ping1.ping();
ping2.ping();
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert_eq!(dispatched, 3);
}
#[test]
fn change_interests() {
use rustix::io::write;
use rustix::net::{recv, socketpair, AddressFamily, RecvFlags, SocketFlags, SocketType};
let mut event_loop = EventLoop::<bool>::try_new().unwrap();
let (sock1, sock2) = socketpair(
AddressFamily::UNIX,
SocketType::STREAM,
SocketFlags::empty(),
None, )
.unwrap();
let source = Generic::new(sock1, Interest::READ, Mode::Level);
let dispatcher = Dispatcher::new(source, |_, fd, dispatched| {
*dispatched = true;
let mut buf = [0u8; 32];
loop {
match recv(&*fd, &mut buf, RecvFlags::DONTWAIT) {
Ok(0) => break, Ok(_) => {}
Err(e) => {
let e: std::io::Error = e.into();
if e.kind() == std::io::ErrorKind::WouldBlock {
break;
} else {
return Err(e);
}
}
}
}
Ok(PostAction::Continue)
});
let sock_token_1 = event_loop
.handle()
.register_dispatcher(dispatcher.clone())
.unwrap();
let mut dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);
write(&sock2, &[1, 2, 3]).unwrap();
dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);
dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);
dispatcher.as_source_mut().interest = Interest::WRITE;
event_loop.handle().update(&sock_token_1).unwrap();
dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);
dispatcher.as_source_mut().interest = Interest::READ;
event_loop.handle().update(&sock_token_1).unwrap();
dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);
}
#[test]
fn kill_source() {
let mut event_loop = EventLoop::<Option<RegistrationToken>>::try_new().unwrap();
let handle = event_loop.handle();
let (ping, ping_source) = make_ping().unwrap();
let ping_token = event_loop
.handle()
.insert_source(ping_source, move |(), &mut (), opt_src| {
if let Some(src) = opt_src.take() {
handle.remove(src);
}
})
.unwrap();
ping.ping();
let mut opt_src = Some(ping_token);
event_loop.dispatch(Duration::ZERO, &mut opt_src).unwrap();
assert!(opt_src.is_none());
}
#[test]
fn non_static_data() {
use std::sync::mpsc;
let (sender, receiver) = mpsc::channel();
{
struct RefSender<'a>(&'a mpsc::Sender<()>);
let mut ref_sender = RefSender(&sender);
let mut event_loop = EventLoop::<RefSender<'_>>::try_new().unwrap();
let (ping, ping_source) = make_ping().unwrap();
let _ping_token = event_loop
.handle()
.insert_source(ping_source, |_, _, ref_sender| {
ref_sender.0.send(()).unwrap();
})
.unwrap();
ping.ping();
event_loop
.dispatch(Duration::ZERO, &mut ref_sender)
.unwrap();
}
receiver.recv().unwrap();
drop(sender);
}
#[cfg(feature = "block_on")]
#[test]
fn block_on_test() {
use crate::sources::timer::TimeoutFuture;
use std::time::Duration;
let mut evl = EventLoop::<()>::try_new().unwrap();
let mut data = 22;
let timeout = {
let data = &mut data;
let evl_handle = evl.handle();
async move {
TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
*data = 32;
11
}
};
let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
assert_eq!(result, Some(11));
assert_eq!(data, 32);
}
#[cfg(feature = "block_on")]
#[test]
fn block_on_early_cancel() {
use crate::sources::timer;
use std::time::Duration;
let mut evl = EventLoop::<()>::try_new().unwrap();
let mut data = 22;
let timeout = {
let data = &mut data;
let evl_handle = evl.handle();
async move {
timer::TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
*data = 32;
11
}
};
let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
let handle = evl.get_signal();
let _timer_token = evl
.handle()
.insert_source(timer_source, move |_, _, _| {
handle.stop();
timer::TimeoutAction::Drop
})
.unwrap();
let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
assert_eq!(result, None);
assert_eq!(data, 22);
}
struct DummySource;
impl crate::EventSource for DummySource {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = crate::Error;
fn process_events<F>(
&mut self,
_: Readiness,
_: Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
callback((), &mut ());
Ok(PostAction::Continue)
}
fn register(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
Ok(())
}
fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
Ok(())
}
fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
Ok(())
}
}
}