#![deny(missing_docs)]
#![allow(unknown_lints)]
#![cfg_attr(feature = "integer_atomics", feature(integer_atomics))]
#![cfg_attr(feature = "conservative_impl_trait", feature(conservative_impl_trait))]
extern crate fnv;
#[macro_use]
extern crate trace_error;
extern crate futures;
extern crate futures_cpupool;
use std::any::Any;
use std::sync::{Arc, RwLock};
use std::sync::atomic::Ordering;
use std::hash::Hash;
use fnv::FnvHashMap;
use std::collections::hash_map::Entry;
use futures::Future;
use futures_cpupool::CpuPool;
#[cfg(not(feature = "conservative_impl_trait"))]
use futures::BoxFuture;
use trace_error::Trace;
pub mod error;
pub use self::error::*;
pub trait EventKey: Hash + PartialEq + Eq + Clone + Send + 'static {}
impl<T> EventKey for T where T: Hash + PartialEq + Eq + Clone + Send + 'static {}
mod internal {
use std::any::Any;
use std::sync::{Arc, RwLock, Mutex};
use trace_error::Trace;
use fnv::FnvHashMap;
use futures::{self, future, Future, BoxFuture};
use futures_cpupool::CpuPool;
#[cfg(feature = "integer_atomics")]
pub use std::sync::atomic::AtomicU64 as AtomicListenerId;
#[cfg(not(feature = "integer_atomics"))]
pub use std::sync::atomic::AtomicUsize as AtomicListenerId;
use super::{ListenerId, EventError, EventResult, EventKey};
#[allow(inline_always)]
#[inline(always)]
pub fn ran(_: ()) -> bool {
true
}
pub fn count_ran(executed: Vec<bool>) -> usize {
executed.into_iter().filter(|ran| *ran).count()
}
pub enum ArcCowish {
Owned(Box<Any + Send>),
Borrowed(Arc<Box<Any + Send>>),
}
unsafe impl Send for ArcCowish {}
pub type SyncCallback = Box<FnMut(ListenerId, Option<ArcCowish>) -> EventResult<bool>>;
pub struct SyncEventListener {
pub id: ListenerId,
pub cb: Mutex<SyncCallback>,
}
unsafe impl Send for SyncEventListener {}
unsafe impl Sync for SyncEventListener {}
impl SyncEventListener {
pub fn new(id: ListenerId, cb: SyncCallback) -> Arc<SyncEventListener> {
Arc::new(SyncEventListener { id: id, cb: Mutex::new(cb) })
}
}
pub type SyncEventListenerLock = Arc<SyncEventListener>;
pub type SyncListenersLock = Arc<RwLock<Vec<SyncEventListenerLock>>>;
pub struct Inner<K: EventKey> {
pub events: RwLock<FnvHashMap<K, SyncListenersLock>>,
pub counter: AtomicListenerId,
pub pool: CpuPool,
}
unsafe impl<K: EventKey> Send for Inner<K> {}
unsafe impl<K: EventKey> Sync for Inner<K> {}
pub fn invoke_value_cb<T, F>(arg: Option<ArcCowish>, cb: &F) -> EventResult<()> where T: Any + Clone + Send, F: Fn(Option<T>) -> EventResult<()> + 'static {
if let Some(arg) = arg {
match arg {
ArcCowish::Borrowed(value) => {
if let Some(value) = value.downcast_ref::<T>() {
return cb(Some(value.clone()));
}
}
ArcCowish::Owned(value) => {
if let Ok(value) = value.downcast::<T>() {
return cb(Some(*value));
}
}
}
}
cb(None)
}
pub fn invoke_sync_cb<T, F>(arg: Option<ArcCowish>, cb: &F) -> EventResult<()> where T: Any + Send + Sync, F: Fn(Option<&T>) -> EventResult<()> + 'static {
if let Some(arg) = arg {
match arg {
ArcCowish::Borrowed(value) => {
if let Some(value) = value.downcast_ref::<T>() {
return cb(Some(&*value));
}
}
ArcCowish::Owned(value) => {
if let Some(value) = value.downcast_ref::<T>() {
return cb(Some(&*value));
}
}
}
}
cb(None)
}
pub fn emit_spawn<K: EventKey>(inner: Arc<Inner<K>>, event: K) -> EventResult<BoxFuture<usize, Trace<EventError>>> {
if let Some(listeners_lock) = try_throw!(inner.events.read()).get(&event) {
let listeners = try_throw!(listeners_lock.read());
if listeners.len() > 0 {
let mut listener_futures = Vec::with_capacity(listeners.len());
for listener in listeners.iter().cloned() {
let listener_future = inner.pool.spawn_fn(move || -> EventResult<bool> {
let mut cb_guard = try_throw!(listener.cb.lock());
(&mut *cb_guard)(listener.id, None)
});
listener_futures.push(listener_future);
}
return Ok(future::join_all(listener_futures).map(count_ran).boxed());
}
}
Ok(futures::finished(0).boxed())
}
pub fn emit_value_spawn<T, K: EventKey>(inner: Arc<Inner<K>>, event: K, value: T) -> EventResult<BoxFuture<usize, Trace<EventError>>> where T: Any + Clone + Send {
if let Some(listeners_lock) = try_throw!(inner.events.read()).get(&event) {
let listeners = try_throw!(listeners_lock.read());
if listeners.len() > 0 {
let mut listener_futures = Vec::with_capacity(listeners.len());
for listener in listeners.iter().cloned() {
let value = value.clone();
let listener_future = inner.pool.spawn_fn(move || -> EventResult<bool> {
let mut cb_guard = try_throw!(listener.cb.lock());
(&mut *cb_guard)(listener.id, Some(ArcCowish::Owned(Box::new(value))))
});
listener_futures.push(listener_future);
}
return Ok(future::join_all(listener_futures).map(count_ran).boxed());
}
}
Ok(futures::finished(0).boxed())
}
pub fn emit_value_sync_spawn<T, K: EventKey>(inner: Arc<Inner<K>>, event: K, value: T) -> EventResult<BoxFuture<usize, Trace<EventError>>> where T: Any + Send + Sync {
if let Some(listeners_lock) = try_throw!(inner.events.read()).get(&event) {
let listeners = try_throw!(listeners_lock.read());
if listeners.len() > 0 {
let mut listener_futures = Vec::with_capacity(listeners.len());
#[derive(Clone)]
struct SendWrapper {
inner: Arc<Box<Any + Send>>
}
unsafe impl Send for SendWrapper {}
let wrapper = SendWrapper { inner: Arc::new(Box::new(value)) };
for listener in listeners.iter().cloned() {
let wrapper = wrapper.clone();
let listener_future = inner.pool.spawn_fn(move || -> EventResult<bool> {
let mut cb_guard = try_throw!(listener.cb.lock());
(&mut *cb_guard)(listener.id, Some(ArcCowish::Borrowed(wrapper.inner)))
});
listener_futures.push(listener_future);
}
return Ok(future::join_all(listener_futures).map(count_ran).boxed());
}
}
Ok(futures::finished(0).boxed())
}
}
use internal::{Inner, ArcCowish, SyncEventListener};
pub type ListenerId = u64;
pub struct ParallelEventEmitter<K: EventKey = String> {
inner: Arc<internal::Inner<K>>,
}
impl<K: EventKey> Default for ParallelEventEmitter<K> {
fn default() -> ParallelEventEmitter<K> {
ParallelEventEmitter::new()
}
}
impl<K: EventKey> ParallelEventEmitter<K> {
pub fn new() -> ParallelEventEmitter<K> {
ParallelEventEmitter::with_pool(CpuPool::new_num_cpus())
}
pub fn with_pool(pool: CpuPool) -> ParallelEventEmitter<K> {
ParallelEventEmitter {
inner: Arc::new(Inner {
events: RwLock::new(FnvHashMap::default()),
counter: internal::AtomicListenerId::new(0),
pool: pool,
})
}
}
pub fn event_names(&self) -> EventResult<Vec<K>> {
let guard = try_throw!(self.inner.events.read());
Ok(guard.keys().cloned().collect())
}
pub fn event_names_visitor<F>(&self, visitor: F) -> EventResult<()> where F: Fn(&K) {
let guard = try_throw!(self.inner.events.read());
for key in guard.keys() {
visitor(key);
}
Ok(())
}
fn add_listener_impl<F>(&mut self, event: K, cb: F) -> EventResult<ListenerId> where F: Fn(ListenerId, Option<ArcCowish>) -> EventResult<bool> + 'static {
match try_throw!(self.inner.events.write()).entry(event) {
Entry::Occupied(listeners_lock) => {
let mut listeners = try_throw!(listeners_lock.get().write());
let id = self.inner.counter.fetch_add(1, Ordering::Relaxed) as ListenerId;
listeners.push(SyncEventListener::new(id, Box::new(cb)));
Ok(id)
},
Entry::Vacant(vacant) => {
let mut listeners = Vec::with_capacity(1);
let id = self.inner.counter.fetch_add(1, Ordering::Relaxed) as ListenerId;
listeners.push(SyncEventListener::new(id, Box::new(cb)));
vacant.insert(Arc::new(RwLock::new(listeners)));
Ok(id)
}
}
}
#[inline]
fn add_listener_impl_simple<F>(&mut self, event: K, cb: F) -> EventResult<ListenerId> where F: Fn(ListenerId, Option<ArcCowish>) -> EventResult<()> + 'static {
self.add_listener_impl(event, move |id, arg| cb(id, arg).map(internal::ran))
}
fn once_impl<F>(&mut self, event: K, cb: F) -> EventResult<ListenerId> where F: Fn(ListenerId, Option<ArcCowish>) -> EventResult<()> + 'static {
let inner_weak = Arc::downgrade(&self.inner);
self.add_listener_impl(event.clone(), move |id, arg| -> EventResult<bool> {
let inner = inner_weak.upgrade().expect("Listener invoked after owning ParallelEventEmitter was dropped");
let mut events = try_throw!(inner.events.write());
match events.entry(event.clone()) {
Entry::Occupied(listeners_lock) => {
let mut listeners = try_throw!(listeners_lock.get().write());
if let Ok(index) = listeners.binary_search_by_key(&id, |listener| listener.id) {
listeners.remove(index);
} else {
return Ok(false);
}
}
Entry::Vacant(_) => {
return Ok(false);
}
}
drop(events);
cb(id, arg).map(internal::ran)
})
}
#[inline]
pub fn add_listener<F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where F: Fn() -> EventResult<()> + 'static {
self.add_listener_impl_simple(event.into(), move |_, _| -> EventResult<()> { cb() })
}
#[inline]
pub fn once<F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where F: Fn() -> EventResult<()> + 'static {
self.once_impl(event.into(), move |_, _| cb())
}
#[inline]
pub fn add_listener_value<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Clone + Send, F: Fn(Option<T>) -> EventResult<()> + 'static {
self.add_listener_impl_simple(event.into(), move |_, arg| internal::invoke_value_cb::<T, F>(arg, &cb))
}
#[inline]
pub fn once_value<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Clone + Send, F: Fn(Option<T>) -> EventResult<()> + 'static {
self.once_impl(event.into(), move |_, arg| internal::invoke_value_cb::<T, F>(arg, &cb))
}
#[inline]
pub fn add_listener_sync<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Send + Sync, F: Fn(Option<&T>) -> EventResult<()> + 'static {
self.add_listener_impl_simple(event.into(), move |_, arg| internal::invoke_sync_cb::<T, F>(arg, &cb))
}
#[inline]
pub fn once_sync<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Send + Sync, F: Fn(Option<&T>) -> EventResult<()> + 'static {
self.once_impl(event.into(), move |_, arg| internal::invoke_sync_cb::<T, F>(arg, &cb))
}
pub fn remove_listener<E: Into<K>>(&mut self, event: E, id: ListenerId) -> EventResult<bool> {
if let Some(listeners_lock) = try_throw!(self.inner.events.read()).get(&event.into()) {
let mut listeners = try_throw!(listeners_lock.write());
if let Ok(index) = listeners.binary_search_by_key(&id, |listener| listener.id) {
listeners.remove(index);
return Ok(true);
}
}
Ok(false)
}
pub fn remove_any_listener(&mut self, id: ListenerId) -> EventResult<bool> {
for listeners_lock in try_throw!(self.inner.events.read()).values() {
let mut listeners = try_throw!(listeners_lock.write());
if let Ok(index) = listeners.binary_search_by_key(&id, |listener| listener.id) {
listeners.remove(index);
return Ok(true);
}
}
Ok(false)
}
#[cfg(feature = "conservative_impl_trait")]
pub fn emit<E: Into<K>>(&mut self, event: E) -> impl Future<Item = usize, Error = Trace<EventError>> {
let event = event.into();
let inner = self.inner.clone();
self.inner.pool.spawn_fn(move || internal::emit_spawn(inner, event)).flatten()
}
#[cfg(not(feature = "conservative_impl_trait"))]
pub fn emit<E: Into<K>>(&mut self, event: E) -> BoxFuture<usize, Trace<EventError>> {
let event = event.into();
let inner = self.inner.clone();
self.inner.pool.spawn_fn(move || internal::emit_spawn(inner, event)).flatten().boxed()
}
#[cfg(feature = "conservative_impl_trait")]
pub fn emit_value<T, E: Into<K>>(&mut self, event: E, value: T) -> impl Future<Item = usize, Error = Trace<EventError>> where T: Any + Clone + Send {
let event = event.into();
let inner = self.inner.clone();
self.inner.pool.spawn_fn(move || internal::emit_value_spawn(inner, event, value)).flatten()
}
#[cfg(not(feature = "conservative_impl_trait"))]
pub fn emit_value<T, E: Into<K>>(&mut self, event: E, value: T) -> BoxFuture<usize, Trace<EventError>> where T: Any + Clone + Send {
let event = event.into();
let inner = self.inner.clone();
self.inner.pool.spawn_fn(move || internal::emit_value_spawn(inner, event, value)).flatten().boxed()
}
#[cfg(feature = "conservative_impl_trait")]
pub fn emit_value_sync<T, E: Into<K>>(&mut self, event: E, value: T) -> impl Future<Item = usize, Error = Trace<EventError>> where T: Any + Send + Sync {
let event = event.into();
let inner = self.inner.clone();
self.inner.pool.spawn_fn(move || internal::emit_value_sync_spawn(inner, event, value)).flatten()
}
#[cfg(not(feature = "conservative_impl_trait"))]
pub fn emit_value_sync<T, E: Into<K>>(&mut self, event: E, value: T) -> BoxFuture<usize, Trace<EventError>> where T: Any + Send + Sync {
let event = event.into();
let inner = self.inner.clone();
self.inner.pool.spawn_fn(move || internal::emit_value_sync_spawn(inner, event, value)).flatten().boxed()
}
}