use std::cell::{Cell, RefCell};
use std::fmt::Debug;
use std::io;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[cfg(feature = "block_on")]
use std::future::Future;
use io_lifetimes::AsFd;
use slab::Slab;
use crate::sources::{Dispatcher, EventSource, Idle, IdleDispatcher};
use crate::sys::Notifier;
use crate::{EventDispatcher, InsertError, Poll, PostAction, TokenFactory};
type IdleCallback<'i, Data> = Rc<RefCell<dyn IdleDispatcher<Data> + 'i>>;
#[cfg(target_pointer_width = "64")]
pub(crate) const MAX_SOURCES: u32 = 44;
#[cfg(target_pointer_width = "32")]
pub(crate) const MAX_SOURCES: u32 = 22;
#[cfg(target_pointer_width = "16")]
pub(crate) const MAX_SOURCES: u32 = 10;
#[cfg(target_pointer_width = "64")]
pub(crate) const MAX_SUBSOURCES: u32 = 20;
#[cfg(target_pointer_width = "32")]
pub(crate) const MAX_SUBSOURCES: u32 = 10;
#[cfg(target_pointer_width = "16")]
pub(crate) const MAX_SUBSOURCES: u32 = 6;
pub(crate) const MAX_SOURCES_TOTAL: usize = 1 << MAX_SOURCES;
pub(crate) const MAX_SUBSOURCES_TOTAL: usize = 1 << MAX_SUBSOURCES;
pub(crate) const MAX_SOURCES_MASK: usize = MAX_SOURCES_TOTAL - 1;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RegistrationToken {
key: usize,
}
pub(crate) struct LoopInner<'l, Data> {
pub(crate) poll: RefCell<Poll>,
pub(crate) sources: RefCell<Slab<Rc<dyn EventDispatcher<Data> + 'l>>>,
idles: RefCell<Vec<IdleCallback<'l, Data>>>,
pending_action: Cell<PostAction>,
}
pub struct LoopHandle<'l, Data> {
inner: Rc<LoopInner<'l, Data>>,
}
impl<'l, Data> std::fmt::Debug for LoopHandle<'l, Data> {
#[cfg_attr(feature = "nightly_coverage", no_coverage)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("LoopHandle { ... }")
}
}
impl<'l, Data> Clone for LoopHandle<'l, Data> {
#[cfg_attr(feature = "nightly_coverage", no_coverage)]
fn clone(&self) -> Self {
LoopHandle {
inner: self.inner.clone(),
}
}
}
impl<'l, Data> LoopHandle<'l, Data> {
pub fn insert_source<S, F>(
&self,
source: S,
callback: F,
) -> Result<RegistrationToken, InsertError<S>>
where
S: EventSource + 'l,
F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'l,
{
let dispatcher = Dispatcher::new(source, callback);
self.register_dispatcher(dispatcher.clone())
.map_err(|error| InsertError {
error,
inserted: dispatcher.into_source_inner(),
})
}
#[cfg_attr(feature = "nightly_coverage", no_coverage)] pub fn register_dispatcher<S>(
&self,
dispatcher: Dispatcher<'l, S, Data>,
) -> crate::Result<RegistrationToken>
where
S: EventSource + 'l,
{
let mut sources = self.inner.sources.borrow_mut();
let mut poll = self.inner.poll.borrow_mut();
if sources.vacant_key() >= MAX_SOURCES_TOTAL {
return Err(crate::Error::IoError(std::io::Error::new(
std::io::ErrorKind::Other,
"Too many sources",
)));
}
let key = sources.insert(dispatcher.clone_as_event_dispatcher());
let ret = sources
.get(key)
.unwrap()
.register(&mut poll, &mut TokenFactory::new(key));
if let Err(error) = ret {
sources.try_remove(key).expect("Source was just inserted?!");
return Err(error);
}
Ok(RegistrationToken { key })
}
pub fn insert_idle<'i, F: FnOnce(&mut Data) + 'l + 'i>(&self, callback: F) -> Idle<'i> {
let mut opt_cb = Some(callback);
let callback = Rc::new(RefCell::new(Some(move |data: &mut Data| {
if let Some(cb) = opt_cb.take() {
cb(data);
}
})));
self.inner.idles.borrow_mut().push(callback.clone());
Idle { callback }
}
pub fn enable(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
source.register(
&mut self.inner.poll.borrow_mut(),
&mut TokenFactory::new(token.key),
)?;
}
Ok(())
}
pub fn update(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
if !source.reregister(
&mut self.inner.poll.borrow_mut(),
&mut TokenFactory::new(token.key),
)? {
self.inner.pending_action.set(PostAction::Reregister);
}
}
Ok(())
}
pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
if !source.unregister(&mut self.inner.poll.borrow_mut())? {
self.inner.pending_action.set(PostAction::Disable);
}
}
Ok(())
}
pub fn remove(&self, token: RegistrationToken) {
if let Some(source) = self.inner.sources.borrow_mut().try_remove(token.key) {
if let Err(e) = source.unregister(&mut self.inner.poll.borrow_mut()) {
log::warn!(
"[calloop] Failed to unregister source from the polling system: {:?}",
e
);
}
}
}
pub fn adapt_io<F: AsFd>(&self, fd: F) -> crate::Result<crate::io::Async<'l, F>> {
crate::io::Async::new(self.inner.clone(), fd)
}
}
pub struct EventLoop<'l, Data> {
handle: LoopHandle<'l, Data>,
signals: Arc<Signals>,
}
impl<'l, Data> std::fmt::Debug for EventLoop<'l, Data> {
#[cfg_attr(feature = "nightly_coverage", no_coverage)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("EventLoop { ... }")
}
}
struct Signals {
stop: AtomicBool,
#[cfg(feature = "block_on")]
future_ready: AtomicBool,
}
impl<'l, Data> EventLoop<'l, Data> {
pub fn try_new() -> crate::Result<Self> {
let poll = Poll::new()?;
let handle = LoopHandle {
inner: Rc::new(LoopInner {
poll: RefCell::new(poll),
sources: RefCell::new(Slab::new()),
idles: RefCell::new(Vec::new()),
pending_action: Cell::new(PostAction::Continue),
}),
};
Ok(EventLoop {
handle,
signals: Arc::new(Signals {
stop: AtomicBool::new(false),
#[cfg(feature = "block_on")]
future_ready: AtomicBool::new(false),
}),
})
}
pub fn handle(&self) -> LoopHandle<'l, Data> {
self.handle.clone()
}
fn dispatch_events(
&mut self,
mut timeout: Option<Duration>,
data: &mut Data,
) -> crate::Result<()> {
let now = Instant::now();
let events = {
let poll = self.handle.inner.poll.borrow();
loop {
let result = poll.poll(timeout);
match result {
Ok(events) => break events,
Err(crate::Error::IoError(err)) if err.kind() == io::ErrorKind::Interrupted => {
if let Some(to) = timeout {
let elapsed = now.elapsed();
if elapsed >= to {
return Ok(());
} else {
timeout = Some(to - elapsed);
}
}
}
Err(err) => return Err(err),
};
}
};
for event in events {
let registroken_token = event.token.key & MAX_SOURCES_MASK;
let opt_disp = self
.handle
.inner
.sources
.borrow()
.get(registroken_token)
.cloned();
if let Some(disp) = opt_disp {
let mut ret = disp.process_events(event.readiness, event.token, data)?;
let pending_action = self
.handle
.inner
.pending_action
.replace(PostAction::Continue);
if let PostAction::Continue = ret {
ret = pending_action;
}
match ret {
PostAction::Reregister => {
disp.reregister(
&mut self.handle.inner.poll.borrow_mut(),
&mut TokenFactory::new(event.token.key),
)?;
}
PostAction::Disable => {
disp.unregister(&mut self.handle.inner.poll.borrow_mut())?;
}
PostAction::Remove => {
self.handle
.inner
.sources
.borrow_mut()
.remove(event.token.key);
}
PostAction::Continue => {}
}
if !self
.handle
.inner
.sources
.borrow()
.contains(registroken_token)
{
let mut poll = self.handle.inner.poll.borrow_mut();
if let Err(e) = disp.unregister(&mut poll) {
log::warn!(
"[calloop] Failed to unregister source from the polling system: {:?}",
e
);
}
}
} else {
log::warn!(
"[calloop] Received an event for non-existence source: {:?}",
event.token.key
);
}
}
Ok(())
}
fn dispatch_idles(&mut self, data: &mut Data) {
let idles = std::mem::take(&mut *self.handle.inner.idles.borrow_mut());
for idle in idles {
idle.borrow_mut().dispatch(data);
}
}
fn invoke_pre_run(&self, data: &mut Data) -> crate::Result<()> {
let sources = self
.handle
.inner
.sources
.borrow()
.iter()
.map(|(_, source)| source.clone())
.collect::<Vec<_>>();
for source in sources {
source.pre_run(data)?;
}
Ok(())
}
fn invoke_post_run(&self, data: &mut Data) -> crate::Result<()> {
let sources = self
.handle
.inner
.sources
.borrow()
.iter()
.map(|(_, source)| source.clone())
.collect::<Vec<_>>();
for source in sources {
source.post_run(data)?;
}
Ok(())
}
pub fn dispatch<D: Into<Option<Duration>>>(
&mut self,
timeout: D,
data: &mut Data,
) -> crate::Result<()> {
self.invoke_pre_run(data)?;
self.dispatch_events(timeout.into(), data)?;
self.dispatch_idles(data);
self.invoke_post_run(data)?;
Ok(())
}
pub fn get_signal(&self) -> LoopSignal {
LoopSignal {
signal: self.signals.clone(),
notifier: self.handle.inner.poll.borrow().notifier(),
}
}
pub fn run<F, D: Into<Option<Duration>>>(
&mut self,
timeout: D,
data: &mut Data,
mut cb: F,
) -> crate::Result<()>
where
F: FnMut(&mut Data),
{
let timeout = timeout.into();
self.signals.stop.store(false, Ordering::Release);
self.invoke_pre_run(data)?;
while !self.signals.stop.load(Ordering::Acquire) {
self.dispatch_events(timeout, data)?;
self.dispatch_idles(data);
cb(data);
}
self.invoke_post_run(data)?;
Ok(())
}
#[cfg(feature = "block_on")]
pub fn block_on<R>(
&mut self,
future: impl Future<Output = R>,
data: &mut Data,
mut cb: impl FnMut(&mut Data),
) -> crate::Result<Option<R>> {
use std::task::{Context, Poll, Wake, Waker};
struct EventLoopWaker(LoopSignal);
impl Wake for EventLoopWaker {
fn wake(self: Arc<Self>) {
self.0.signal.future_ready.store(true, Ordering::Release);
self.0.notifier.notify().ok();
}
fn wake_by_ref(self: &Arc<Self>) {
self.0.signal.future_ready.store(true, Ordering::Release);
self.0.notifier.notify().ok();
}
}
pin_utils::pin_mut!(future);
let waker = {
let handle = EventLoopWaker(self.get_signal());
Waker::from(Arc::new(handle))
};
let mut context = Context::from_waker(&waker);
let mut output = None;
self.signals.stop.store(false, Ordering::Release);
self.signals.future_ready.store(true, Ordering::Release);
self.invoke_pre_run(data)?;
while !self.signals.stop.load(Ordering::Acquire) {
if self.signals.future_ready.swap(false, Ordering::AcqRel) {
if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
output = Some(result);
break;
}
}
self.dispatch_events(None, data)?;
self.dispatch_idles(data);
cb(data);
}
self.invoke_post_run(data)?;
Ok(output)
}
}
#[derive(Clone)]
pub struct LoopSignal {
signal: Arc<Signals>,
notifier: Notifier,
}
impl std::fmt::Debug for LoopSignal {
#[cfg_attr(feature = "nightly_coverage", no_coverage)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("LoopSignal { ... }")
}
}
impl LoopSignal {
pub fn stop(&self) {
self.signal.stop.store(true, Ordering::Release);
}
pub fn wakeup(&self) {
self.notifier.notify().ok();
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::{
generic::Generic, ping::*, Dispatcher, Interest, Mode, Poll, PostAction, Readiness,
RegistrationToken, Token, TokenFactory,
};
use super::EventLoop;
#[test]
fn dispatch_idle() {
let mut event_loop = EventLoop::try_new().unwrap();
let mut dispatched = false;
event_loop.handle().insert_idle(|d| {
*d = true;
});
event_loop
.dispatch(Some(Duration::ZERO), &mut dispatched)
.unwrap();
assert!(dispatched);
}
#[test]
fn cancel_idle() {
let mut event_loop = EventLoop::try_new().unwrap();
let mut dispatched = false;
let handle = event_loop.handle();
let idle = handle.insert_idle(move |d| {
*d = true;
});
idle.cancel();
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);
}
#[test]
fn wakeup() {
let mut event_loop = EventLoop::try_new().unwrap();
let signal = event_loop.get_signal();
::std::thread::spawn(move || {
::std::thread::sleep(Duration::from_millis(500));
signal.wakeup();
});
event_loop.dispatch(None, &mut ()).unwrap();
}
#[test]
fn wakeup_stop() {
let mut event_loop = EventLoop::try_new().unwrap();
let signal = event_loop.get_signal();
::std::thread::spawn(move || {
::std::thread::sleep(Duration::from_millis(500));
signal.stop();
signal.wakeup();
});
event_loop.run(None, &mut (), |_| {}).unwrap();
}
#[test]
fn insert_bad_source() {
use std::os::unix::io::FromRawFd;
let event_loop = EventLoop::<()>::try_new().unwrap();
let fd = unsafe { io_lifetimes::OwnedFd::from_raw_fd(420) };
let ret = event_loop.handle().insert_source(
crate::sources::generic::Generic::new(fd, Interest::READ, Mode::Level),
|_, _, _| Ok(PostAction::Continue),
);
assert!(ret.is_err());
}
#[test]
fn insert_source_no_interest() {
use nix::unistd::{close, pipe};
use std::os::unix::io::FromRawFd;
let (read, write) = pipe().unwrap();
let read = unsafe { io_lifetimes::OwnedFd::from_raw_fd(read) };
close(write).unwrap();
let source = crate::sources::generic::Generic::new(read, Interest::EMPTY, Mode::Level);
let dispatcher = Dispatcher::new(source, |_, _, _| Ok(PostAction::Continue));
let event_loop = EventLoop::<()>::try_new().unwrap();
let handle = event_loop.handle();
let ret = handle.register_dispatcher(dispatcher.clone());
if let Ok(token) = ret {
handle.remove(token);
} else {
panic!();
}
}
#[test]
fn disarm_rearm() {
let mut event_loop = EventLoop::<bool>::try_new().unwrap();
let (ping, ping_source) = make_ping().unwrap();
let ping_token = event_loop
.handle()
.insert_source(ping_source, |(), &mut (), dispatched| {
*dispatched = true;
})
.unwrap();
ping.ping();
let mut dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);
ping.ping();
event_loop.handle().disable(&ping_token).unwrap();
let mut dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);
event_loop.handle().enable(&ping_token).unwrap();
let mut dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);
}
#[test]
fn multiple_tokens() {
struct DoubleSource {
ping1: PingSource,
ping2: PingSource,
}
impl crate::EventSource for DoubleSource {
type Event = u32;
type Metadata = ();
type Ret = ();
type Error = PingError;
fn process_events<F>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.ping1
.process_events(readiness, token, |(), &mut ()| callback(1, &mut ()))?;
self.ping2
.process_events(readiness, token, |(), &mut ()| callback(2, &mut ()))?;
Ok(PostAction::Continue)
}
fn register(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.ping1.register(poll, token_factory)?;
self.ping2.register(poll, token_factory)?;
Ok(())
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.ping1.reregister(poll, token_factory)?;
self.ping2.reregister(poll, token_factory)?;
Ok(())
}
fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.ping1.unregister(poll)?;
self.ping2.unregister(poll)?;
Ok(())
}
}
let mut event_loop = EventLoop::<u32>::try_new().unwrap();
let (ping1, source1) = make_ping().unwrap();
let (ping2, source2) = make_ping().unwrap();
let source = DoubleSource {
ping1: source1,
ping2: source2,
};
event_loop
.handle()
.insert_source(source, |i, _, d| {
eprintln!("Dispatching {}", i);
*d += i
})
.unwrap();
let mut dispatched = 0;
ping1.ping();
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert_eq!(dispatched, 1);
dispatched = 0;
ping2.ping();
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert_eq!(dispatched, 2);
dispatched = 0;
ping1.ping();
ping2.ping();
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert_eq!(dispatched, 3);
}
#[test]
fn change_interests() {
use nix::sys::socket::{recv, socketpair, AddressFamily, MsgFlags, SockFlag, SockType};
use nix::unistd::write;
use std::os::unix::io::{AsRawFd, FromRawFd};
let mut event_loop = EventLoop::<bool>::try_new().unwrap();
let (sock1, sock2) = socketpair(
AddressFamily::Unix,
SockType::Stream,
None,
SockFlag::empty(), )
.unwrap();
let sock1 = unsafe { io_lifetimes::OwnedFd::from_raw_fd(sock1) };
let source = Generic::new(sock1, Interest::READ, Mode::Level);
let dispatcher = Dispatcher::new(source, |_, fd, dispatched| {
*dispatched = true;
let mut buf = [0u8; 32];
loop {
match recv(fd.as_raw_fd(), &mut buf, MsgFlags::MSG_DONTWAIT) {
Ok(0) => break, Ok(_) => {}
Err(e) => {
let e: std::io::Error = e.into();
if e.kind() == std::io::ErrorKind::WouldBlock {
break;
} else {
return Err(e);
}
}
}
}
Ok(PostAction::Continue)
});
let sock_token_1 = event_loop
.handle()
.register_dispatcher(dispatcher.clone())
.unwrap();
let mut dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);
write(sock2, &[1, 2, 3]).unwrap();
dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);
dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);
dispatcher.as_source_mut().interest = Interest::WRITE;
event_loop.handle().update(&sock_token_1).unwrap();
dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(dispatched);
dispatcher.as_source_mut().interest = Interest::READ;
event_loop.handle().update(&sock_token_1).unwrap();
dispatched = false;
event_loop
.dispatch(Duration::ZERO, &mut dispatched)
.unwrap();
assert!(!dispatched);
}
#[test]
fn kill_source() {
let mut event_loop = EventLoop::<Option<RegistrationToken>>::try_new().unwrap();
let handle = event_loop.handle();
let (ping, ping_source) = make_ping().unwrap();
let ping_token = event_loop
.handle()
.insert_source(ping_source, move |(), &mut (), opt_src| {
if let Some(src) = opt_src.take() {
handle.remove(src);
}
})
.unwrap();
ping.ping();
let mut opt_src = Some(ping_token);
event_loop.dispatch(Duration::ZERO, &mut opt_src).unwrap();
assert!(opt_src.is_none());
}
#[test]
fn non_static_data() {
use std::sync::mpsc;
let (sender, receiver) = mpsc::channel();
{
struct RefSender<'a>(&'a mpsc::Sender<()>);
let mut ref_sender = RefSender(&sender);
let mut event_loop = EventLoop::<RefSender<'_>>::try_new().unwrap();
let (ping, ping_source) = make_ping().unwrap();
let _ping_token = event_loop
.handle()
.insert_source(ping_source, |_, _, ref_sender| {
ref_sender.0.send(()).unwrap();
})
.unwrap();
ping.ping();
event_loop
.dispatch(Duration::ZERO, &mut ref_sender)
.unwrap();
}
receiver.recv().unwrap();
drop(sender);
}
#[cfg(feature = "block_on")]
#[test]
fn block_on_test() {
use crate::sources::timer::TimeoutFuture;
use std::time::Duration;
let mut evl = EventLoop::<()>::try_new().unwrap();
let mut data = 22;
let timeout = {
let data = &mut data;
let evl_handle = evl.handle();
async move {
TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
*data = 32;
11
}
};
let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
assert_eq!(result, Some(11));
assert_eq!(data, 32);
}
#[cfg(feature = "block_on")]
#[test]
fn block_on_early_cancel() {
use crate::sources::timer;
use std::time::Duration;
let mut evl = EventLoop::<()>::try_new().unwrap();
let mut data = 22;
let timeout = {
let data = &mut data;
let evl_handle = evl.handle();
async move {
timer::TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
*data = 32;
11
}
};
let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
let handle = evl.get_signal();
let _timer_token = evl
.handle()
.insert_source(timer_source, move |_, _, _| {
handle.stop();
timer::TimeoutAction::Drop
})
.unwrap();
let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
assert_eq!(result, None);
assert_eq!(data, 22);
}
struct DummySource;
impl crate::EventSource for DummySource {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = crate::Error;
fn process_events<F>(
&mut self,
_: Readiness,
_: Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
callback((), &mut ());
Ok(PostAction::Continue)
}
fn register(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
Ok(())
}
fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
Ok(())
}
fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
Ok(())
}
}
}