thread_local! {
static POLL: ::mio::Poll = ::mio::Poll::new().unwrap();
static EVENT_HANDLERS: ::std::cell::RefCell<::std::collections::BTreeMap<usize, ::std::rc::Rc<dyn Fn(::mio::Event) -> () + 'static>>> = ::std::default::Default::default();
static TIME_CALLBACKS: ::std::cell::RefCell<::std::collections::BTreeMap<usize, TimeCallback>> = ::std::default::Default::default();
static TICKER: ::std::cell::RefCell<usize> = ::std::cell::RefCell::new(1);
static EVENT_BUFFER: ::std::rc::Rc<::std::cell::RefCell<::mio::Events>> = ::std::rc::Rc::new(::std::cell::RefCell::new(::mio::Events::with_capacity(1024)));
static REMOTE_RECEIVER: ::std::cell::RefCell<Option<::std::sync::mpsc::Receiver<Box<dyn Fn() -> () + Send + 'static>>>> = ::std::default::Default::default();
static REMOTE_REGISTRATION: ::std::cell::RefCell<Option<::mio::Registration>> = ::std::default::Default::default();
static SHOULD_CONTINUE: ::std::cell::RefCell<bool> = ::std::default::Default::default();
}
lazy_static! {
static ref REMOTES: ::std::sync::Mutex<::std::collections::HashMap<::std::thread::ThreadId, Remote>> = ::std::default::Default::default();
}
struct TimeCallback {
callback: ::std::rc::Rc<dyn Fn() -> () + 'static>,
when: ::std::time::Instant,
interval: Option<::std::time::Duration>,
}
struct Remote {
set_readiness: ::mio::SetReadiness,
run_sender: ::std::sync::mpsc::Sender<Box<dyn Fn() -> () + Send + 'static>>,
}
fn tick() -> usize {
TICKER.with(|x| {
let a: usize = *x.borrow();
*x.borrow_mut() = a.checked_add(1).expect("Ran out of callback IDs");
a
})
}
pub fn insert_listener(listener: impl Fn(::mio::Event) -> () + 'static) -> usize {
let idx = tick();
EVENT_HANDLERS.with(|x| x.borrow_mut().insert(idx, ::std::rc::Rc::new(listener)));
idx
}
pub fn remove_listener(idx: usize) -> bool {
EVENT_HANDLERS.with(|x| x.borrow_mut().remove(&idx).is_some())
}
pub fn set_timeout(callback: impl Fn() -> () + 'static, timeout: ::std::time::Duration) -> usize {
let callback = ::std::rc::Rc::new(callback);
let when = ::std::time::Instant::now() + timeout;
let idx = tick();
TIME_CALLBACKS.with(|x| {
x.borrow_mut().insert(
idx,
TimeCallback {
callback,
when,
interval: None,
},
)
});
idx
}
pub fn set_interval(callback: impl Fn() -> () + 'static, interval: ::std::time::Duration) -> usize {
let callback = ::std::rc::Rc::new(callback);
let when = ::std::time::Instant::now() + interval;
let idx = tick();
TIME_CALLBACKS.with(|x| {
x.borrow_mut().insert(
idx,
TimeCallback {
callback,
when,
interval: Some(interval),
},
)
});
idx
}
pub fn clear_timeout(idx: usize) -> bool {
if TIME_CALLBACKS.with(|x| x.borrow().get(&idx).map(|y| y.interval.is_some()).unwrap_or(true)) {
return false;
}
TIME_CALLBACKS.with(|x| x.borrow_mut().remove(&idx).is_some())
}
pub fn clear_interval(idx: usize) -> bool {
if TIME_CALLBACKS.with(|x| x.borrow().get(&idx).map(|y| y.interval.is_none()).unwrap_or(true)) {
return false;
}
TIME_CALLBACKS.with(|x| x.borrow_mut().remove(&idx).is_some())
}
fn get_soonest_timeout() -> (Option<usize>, Option<::std::time::Duration>) {
let mut idx = None;
let mut soonest_instant = None;
TIME_CALLBACKS.with(|x| {
for (k, v) in x.borrow_mut().iter() {
if soonest_instant.is_none() || v.when < soonest_instant.unwrap() {
idx = Some(*k);
soonest_instant = Some(v.when);
}
}
});
if idx.is_none() {
(None, None)
} else {
let now = ::std::time::Instant::now();
if soonest_instant.unwrap() <= now {
(idx, Some(::std::time::Duration::from_secs(0)))
} else {
(idx, Some(soonest_instant.unwrap().duration_since(now)))
}
}
}
fn dispatch_timeout(time_idx: usize) {
let now = ::std::time::Instant::now();
if TIME_CALLBACKS.with(|x| x.borrow().get(&time_idx).unwrap().when <= now) {
if TIME_CALLBACKS.with(|x| x.borrow().get(&time_idx).unwrap().interval.is_none()) {
let callback = TIME_CALLBACKS.with(|x| x.borrow_mut().remove(&time_idx).unwrap());
(callback.callback)();
} else {
let callback = TIME_CALLBACKS.with(|x| {
let interval = x.borrow().get(&time_idx).unwrap().interval.unwrap();
x.borrow_mut().get_mut(&time_idx).unwrap().when = now + interval;
x.borrow().get(&time_idx).unwrap().callback.clone()
});
(*callback)();
}
}
}
pub fn borrow_poll<T, R>(callback: T) -> R
where
T: Fn(&::mio::Poll) -> R,
{
POLL.with(|x| (callback)(x))
}
fn dispatch_event(event: ::mio::Event) {
let token: usize = event.token().0;
if token == 0 {
while let Ok(callback) = REMOTE_RECEIVER.with(|x| x.borrow().as_ref().unwrap().try_recv()) {
(*callback)();
}
return;
}
let callback = EVENT_HANDLERS.with(|x| x.borrow().get(&token).map(|x| x.clone()));
if let Some(callback) = callback {
(*callback)(event);
}
}
fn empty() -> bool {
TIME_CALLBACKS.with(|x| x.borrow().is_empty()) && EVENT_HANDLERS.with(|x| x.borrow().is_empty())
}
fn turn_internal(events: &mut ::mio::Events) {
let (time_idx, timeout) = get_soonest_timeout();
events.clear();
POLL.with(|x| x.poll(events, timeout)).unwrap();
if let Some(time_idx) = time_idx {
dispatch_timeout(time_idx);
}
for event in events.iter() {
dispatch_event(event);
}
}
pub fn run() {
init_loop();
let events_rc = EVENT_BUFFER.with(|x| x.clone());
let events: &mut ::mio::Events = &mut *events_rc.borrow_mut();
while SHOULD_CONTINUE.with(|x| *x.borrow()) {
if empty() {
return;
}
turn_internal(events);
}
}
pub fn run_worker() {
init_loop();
let events_rc = EVENT_BUFFER.with(|x| x.clone());
let events: &mut ::mio::Events = &mut *events_rc.borrow_mut();
while SHOULD_CONTINUE.with(|x| *x.borrow()) {
turn_internal(events);
}
}
fn init_loop() {
SHOULD_CONTINUE.with(|x| *x.borrow_mut() = true);
if REMOTE_RECEIVER.with(|x| x.borrow().is_some()) {
return;
}
let (tx, rx) = ::std::sync::mpsc::channel();
let thread_id = ::std::thread::current().id();
let (registration, set_readiness) = ::mio::Registration::new2();
REMOTE_RECEIVER.with(|x| *x.borrow_mut() = Some(rx));
borrow_poll(|poll| {
poll.register(®istration, ::mio::Token(0), ::mio::Ready::readable(), ::mio::PollOpt::edge())
.unwrap()
});
REMOTE_REGISTRATION.with(|x| *x.borrow_mut() = Some(registration));
let mut remotes_lock = REMOTES.lock().unwrap();
remotes_lock.insert(
thread_id,
Remote {
set_readiness,
run_sender: tx,
},
);
}
pub fn run_in_thread(thread_id: ::std::thread::ThreadId, callback: impl Fn() -> () + Send + 'static) -> Result<(), ()> {
let callback = Box::new(callback);
loop {
let remotes = REMOTES.lock().unwrap();
let remote = match remotes.get(&thread_id) {
Some(remote) => remote,
None => {
::std::thread::yield_now();
continue;
}
};
remote.run_sender.send(callback).map_err(|_| ())?;
remote.set_readiness.set_readiness(::mio::Ready::readable()).map_err(|_| ())?;
return Ok(());
}
}
pub fn stop() {
SHOULD_CONTINUE.with(|x| *x.borrow_mut() = false);
}