parallel_event_emitter/
lib.rs

1//! Parallel Event Emitter
2//!
3//! Implementation of an event emitter that invokes event listener callbacks concurrently in a configurable thread pool,
4//! using `Future`s to notify callers of success or errors.
5//!
6//! Because all values must be transferred across thread boundaries, all types `T` must be `Send`.
7//!
8//! Additionally, all types `T` must be `Any`, so `T: 'static`.
9//!
10//! ## Usage
11//!
12//! ```toml
13//! [dependencies]
14//! futures = "0.1"
15//! parallel-event-emitter = "0.2.4"
16//! ```
17//!
18//! Example using a `String` as the key:
19//!
20//! ```rust
21//! extern crate futures;
22//! extern crate parallel_event_emitter;
23//!
24//! use futures::Future;
25//! use parallel_event_emitter::*;
26//!
27//! fn main() {
28//!     let mut emitter: ParallelEventEmitter<String> = ParallelEventEmitter::new();
29//!
30//!     emitter.add_listener("some event", || {
31//!         println!("Hello, World!");
32//!
33//!         Ok(())
34//!     }).unwrap();
35//!
36//!     assert_eq!(1, emitter.emit("some event").wait().unwrap());
37//! }
38//! ```
39//!
40//! Or using a custom event type:
41//!
42//! ```rust
43//! extern crate futures;
44//! extern crate parallel_event_emitter;
45//!
46//! use futures::Future;
47//! use parallel_event_emitter::*;
48//!
49//! #[derive(Debug, Hash, PartialEq, Eq, Clone)]
50//! enum MyEvents {
51//!     EventA,
52//!     EventB,
53//! }
54//!
55//! fn main() {
56//!     let mut emitter: ParallelEventEmitter<MyEvents> = ParallelEventEmitter::new();
57//!
58//!     emitter.add_listener(MyEvents::EventA, || {
59//!         println!("Hello, World!");
60//!
61//!         Ok(())
62//!     }).unwrap();
63//!
64//!     assert_eq!(1, emitter.emit(MyEvents::EventA).wait().unwrap());
65//! }
66//! ```
67//!
68//! ## `Trace<E>` type
69//!
70//! This crate depends on the [`trace-error`](https://crates.io/crates/trace-error) crate to have simple and lightweight backtraces on all error `Result`s.
71//!
72//! If you choose not to use that, which is fine by me, simply call `.into_error()` on all `Trace<E>` values to get the real error.
73//!
74//! ## `impl Trait` feature
75//!
76//! Instead of having all the `emit*` methods returning a boxed `Future` (`BoxFuture`),
77//! the Cargo feature **`conservative_impl_trait`** can be given to enable `impl Future` return types on
78//! all the `emit*` methods.
79//!
80//! ```toml
81//! [dependencies.parallel-event-emitter]
82//! version = "0.2.4"
83//! features = ["default", "conservative_impl_trait"] # And maybe integer_atomics
84//! ```
85//!
86//! ## Larger `ListenerId`s
87//!
88//! Although the `ListenerId` type itself is `u64`,
89//! the atomic counter underneath is restricted to `AtomicUsize` by default.
90//!
91//! To enable true guaranteed 64-bit counters, use the `integer_atomics` feature for the crate.
92//!
93//! ```toml
94//! [dependencies.parallel-event-emitter]
95//! version = "0.2.4"
96//! features = ["default", "integer_atomics"] # And maybe conservative_impl_trait
97//! ```
98//!
99
100#![deny(missing_docs)]
101#![allow(unknown_lints)]
102
103#![cfg_attr(feature = "integer_atomics", feature(integer_atomics))]
104#![cfg_attr(feature = "conservative_impl_trait", feature(conservative_impl_trait))]
105
106extern crate fnv;
107
108#[macro_use]
109extern crate trace_error;
110extern crate futures;
111extern crate futures_cpupool;
112
113use std::any::Any;
114use std::sync::{Arc, RwLock};
115
116use std::sync::atomic::Ordering;
117
118use std::hash::Hash;
119use fnv::FnvHashMap;
120use std::collections::hash_map::Entry;
121
122use futures::Future;
123use futures_cpupool::CpuPool;
124
125// BoxFuture is unused if we use impl Trait instead
126#[cfg(not(feature = "conservative_impl_trait"))]
127use futures::BoxFuture;
128
129use trace_error::Trace;
130
131pub mod error;
132
133pub use self::error::*;
134
135/// Types that can be used as event identifiers
136///
137/// Example using an enum:
138///
139/// ```ignore
140/// #[derive(Debug, Hash, PartialEq, Eq, Clone)]
141/// enum MyEvents {
142///     EventA,
143///     EventB,
144/// }
145/// ```
146pub trait EventKey: Hash + PartialEq + Eq + Clone + Send + 'static {}
147
148impl<T> EventKey for T where T: Hash + PartialEq + Eq + Clone + Send + 'static {}
149
150/// The internal module defines non-public functionality.
151/// 
152/// By putting most of it in its own module it helps with organization.
153mod internal {
154    use std::any::Any;
155    use std::sync::{Arc, RwLock, Mutex};
156
157    use trace_error::Trace;
158
159    use fnv::FnvHashMap;
160
161    use futures::{self, future, Future, BoxFuture};
162    use futures_cpupool::CpuPool;
163
164    #[cfg(feature = "integer_atomics")]
165    pub use std::sync::atomic::AtomicU64 as AtomicListenerId;
166
167    #[cfg(not(feature = "integer_atomics"))]
168    pub use std::sync::atomic::AtomicUsize as AtomicListenerId;
169
170    use super::{ListenerId, EventError, EventResult, EventKey};
171
172    /// Helper function to map listener results with
173    #[allow(inline_always)]
174    #[inline(always)]
175    pub fn ran(_: ()) -> bool {
176        true
177    }
178
179    /// Helper function to count successfully ran listeners
180    pub fn count_ran(executed: Vec<bool>) -> usize {
181        executed.into_iter().filter(|ran| *ran).count()
182    }
183
184    /// This allows callbacks to take both values and references depending on the situation
185    pub enum ArcCowish {
186        /// Owned value that can be moved out
187        Owned(Box<Any + Send>),
188        /// Borrowed value that must be passed by reference
189        Borrowed(Arc<Box<Any + Send>>),
190    }
191
192    unsafe impl Send for ArcCowish {}
193
194    /// This handler callback takes the listener id and the argument,
195    /// and returns `Ok(true)` if the listener callback was invoked correctly.
196    pub type SyncCallback = Box<FnMut(ListenerId, Option<ArcCowish>) -> EventResult<bool>>;
197
198    /// Stores the listener callback and its ID value
199    ///
200    /// The odd order of locks and `Arc`s is due to needing to access the `id` field without locking
201    pub struct SyncEventListener {
202        /// Immutable unique ID of the listener
203        pub id: ListenerId,
204        /// Lockable callback. It must have a lock because as an `Fn`
205        pub cb: Mutex<SyncCallback>,
206    }
207
208    unsafe impl Send for SyncEventListener {}
209
210    unsafe impl Sync for SyncEventListener {}
211
212    impl SyncEventListener {
213        /// Create a new `SyncEventListener` from an id and callback
214        pub fn new(id: ListenerId, cb: SyncCallback) -> Arc<SyncEventListener> {
215            Arc::new(SyncEventListener { id: id, cb: Mutex::new(cb) })
216        }
217    }
218
219    /// Reference counted `SyncEventListener`
220    pub type SyncEventListenerLock = Arc<SyncEventListener>;
221
222    /// Reference counted and lockable listener vector
223    pub type SyncListenersLock = Arc<RwLock<Vec<SyncEventListenerLock>>>;
224
225    /// Inner structure that can be referenced from within the threadpool and listener callbacks
226    pub struct Inner<K: EventKey> {
227        /// Event table
228        pub events: RwLock<FnvHashMap<K, SyncListenersLock>>,
229        /// Atomic counter that always increments
230        pub counter: AtomicListenerId,
231        /// Threadpool to dispatch listener callbacks in
232        pub pool: CpuPool,
233    }
234
235    unsafe impl<K: EventKey> Send for Inner<K> {}
236
237    unsafe impl<K: EventKey> Sync for Inner<K> {}
238
239    /// Internal function used in callbacks in `add_listener_value` and `once_value`
240    pub fn invoke_value_cb<T, F>(arg: Option<ArcCowish>, cb: &F) -> EventResult<()> where T: Any + Clone + Send, F: Fn(Option<T>) -> EventResult<()> + 'static {
241        if let Some(arg) = arg {
242            match arg {
243                ArcCowish::Borrowed(value) => {
244                    // If the value is borrowed, but T is Clone, we can clone a unique value
245                    if let Some(value) = value.downcast_ref::<T>() {
246                        return cb(Some(value.clone()));
247                    }
248                }
249                ArcCowish::Owned(value) => {
250                    // If it's owned, we just dereference it directly.
251                    if let Ok(value) = value.downcast::<T>() {
252                        return cb(Some(*value));
253                    }
254                }
255            }
256        }
257
258        cb(None)
259    }
260
261    /// Internal function used in callbacks in `add_listener_sync` and `once_sync`
262    pub fn invoke_sync_cb<T, F>(arg: Option<ArcCowish>, cb: &F) -> EventResult<()> where T: Any + Send + Sync, F: Fn(Option<&T>) -> EventResult<()> + 'static {
263        if let Some(arg) = arg {
264            match arg {
265                ArcCowish::Borrowed(value) => {
266                    // If the value is borrowed, use the reference to the original copy
267                    if let Some(value) = value.downcast_ref::<T>() {
268                        return cb(Some(&*value));
269                    }
270                }
271                ArcCowish::Owned(value) => {
272                    // If it's owned, do the same, but just use a reference to the local copy
273                    if let Some(value) = value.downcast_ref::<T>() {
274                        return cb(Some(&*value));
275                    }
276                }
277            }
278        }
279
280        cb(None)
281    }
282
283    /// Internal function to facilitate in spawning the listener callback tasks for `emit`
284    pub fn emit_spawn<K: EventKey>(inner: Arc<Inner<K>>, event: K) -> EventResult<BoxFuture<usize, Trace<EventError>>> {
285        if let Some(listeners_lock) = try_throw!(inner.events.read()).get(&event) {
286            let listeners = try_throw!(listeners_lock.read());
287
288            // Don't bother if there aren't any listeners to invoke anyway
289            if listeners.len() > 0 {
290                let mut listener_futures = Vec::with_capacity(listeners.len());
291
292                for listener in listeners.iter().cloned() {
293                    let listener_future = inner.pool.spawn_fn(move || -> EventResult<bool> {
294                        let mut cb_guard = try_throw!(listener.cb.lock());
295
296                        // Force a mutable reference to the callback
297                        (&mut *cb_guard)(listener.id, None)
298                    });
299
300                    listener_futures.push(listener_future);
301                }
302
303                return Ok(future::join_all(listener_futures).map(count_ran).boxed());
304            }
305        }
306
307        Ok(futures::finished(0).boxed())
308    }
309
310    /// Internal function to facilitate in spawning the listener callback tasks for `emit_value`
311    pub fn emit_value_spawn<T, K: EventKey>(inner: Arc<Inner<K>>, event: K, value: T) -> EventResult<BoxFuture<usize, Trace<EventError>>> where T: Any + Clone + Send {
312        if let Some(listeners_lock) = try_throw!(inner.events.read()).get(&event) {
313            let listeners = try_throw!(listeners_lock.read());
314
315            // Don't bother if there aren't any listeners to invoke anyway
316            if listeners.len() > 0 {
317                let mut listener_futures = Vec::with_capacity(listeners.len());
318
319                for listener in listeners.iter().cloned() {
320                    // Clone a local copy of value that can be sent to the listener
321                    let value = value.clone();
322
323                    let listener_future = inner.pool.spawn_fn(move || -> EventResult<bool> {
324                        let mut cb_guard = try_throw!(listener.cb.lock());
325
326                        // Force a mutable reference to the callback
327                        (&mut *cb_guard)(listener.id, Some(ArcCowish::Owned(Box::new(value))))
328                    });
329
330                    listener_futures.push(listener_future);
331                }
332
333                return Ok(future::join_all(listener_futures).map(count_ran).boxed());
334            }
335        }
336
337        Ok(futures::finished(0).boxed())
338    }
339
340    /// Internal function to facilitate in spawning the listener callback tasks for `emit_value_sync`
341    pub fn emit_value_sync_spawn<T, K: EventKey>(inner: Arc<Inner<K>>, event: K, value: T) -> EventResult<BoxFuture<usize, Trace<EventError>>> where T: Any + Send + Sync {
342        if let Some(listeners_lock) = try_throw!(inner.events.read()).get(&event) {
343            let listeners = try_throw!(listeners_lock.read());
344
345            // Don't bother if there aren't any listeners to invoke anyway
346            if listeners.len() > 0 {
347                let mut listener_futures = Vec::with_capacity(listeners.len());
348
349                // We know T is Send, and Box<Any + Send> is really just Box<T>, so it is Send as well
350                #[derive(Clone)]
351                struct SendWrapper {
352                    inner: Arc<Box<Any + Send>>
353                }
354
355                unsafe impl Send for SendWrapper {}
356
357                let wrapper = SendWrapper { inner: Arc::new(Box::new(value)) };
358
359                for listener in listeners.iter().cloned() {
360                    // Clone the wrapper to send across the thread boundary
361                    let wrapper = wrapper.clone();
362
363                    let listener_future = inner.pool.spawn_fn(move || -> EventResult<bool> {
364                        let mut cb_guard = try_throw!(listener.cb.lock());
365
366                        // Force a mutable reference to the callback
367                        (&mut *cb_guard)(listener.id, Some(ArcCowish::Borrowed(wrapper.inner)))
368                    });
369
370                    listener_futures.push(listener_future);
371                }
372
373                return Ok(future::join_all(listener_futures).map(count_ran).boxed());
374            }
375        }
376
377        Ok(futures::finished(0).boxed())
378    }
379}
380
381use internal::{Inner, ArcCowish, SyncEventListener};
382
383/// Integer type used to represent unique listener ids
384pub type ListenerId = u64;
385
386/// Parallel Event Emitter
387///
388/// Listeners added to the emitter will be invoked in a thread pool concurrently.
389pub struct ParallelEventEmitter<K: EventKey = String> {
390    inner: Arc<internal::Inner<K>>,
391}
392
393impl<K: EventKey> Default for ParallelEventEmitter<K> {
394    fn default() -> ParallelEventEmitter<K> {
395        ParallelEventEmitter::new()
396    }
397}
398
399impl<K: EventKey> ParallelEventEmitter<K> {
400    /// Creates a new `ParallelEventEmitter` with the default `CpuPool`
401    pub fn new() -> ParallelEventEmitter<K> {
402        ParallelEventEmitter::with_pool(CpuPool::new_num_cpus())
403    }
404
405    /// Creates a new `ParallelEventEmitter` with an already existing `CpuPool` instance.
406    ///
407    /// This allows for custom thread preferences and lifecycle hooks.
408    pub fn with_pool(pool: CpuPool) -> ParallelEventEmitter<K> {
409        ParallelEventEmitter {
410            inner: Arc::new(Inner {
411                events: RwLock::new(FnvHashMap::default()),
412                counter: internal::AtomicListenerId::new(0),
413                pool: pool,
414            })
415        }
416    }
417
418    /// Collect the names of all events being listened for.
419    ///
420    /// Unfortunately, this method locks a mutex on an internal structure,
421    /// so an iterator cannot be returned.
422    pub fn event_names(&self) -> EventResult<Vec<K>> {
423        let guard = try_throw!(self.inner.events.read());
424
425        Ok(guard.keys().cloned().collect())
426    }
427
428    /// As an alternative to cloning all the event names and collecting them into a `Vec`,
429    /// like in `event_names`,
430    /// a visitor callback can be used to iterate all the event names more efficiently.
431    pub fn event_names_visitor<F>(&self, visitor: F) -> EventResult<()> where F: Fn(&K) {
432        let guard = try_throw!(self.inner.events.read());
433
434        for key in guard.keys() {
435            visitor(key);
436        }
437
438        Ok(())
439    }
440
441    /// Internal function that takes care of the listener vectors, generating new listener ids and inserting the new listener
442    fn add_listener_impl<F>(&mut self, event: K, cb: F) -> EventResult<ListenerId> where F: Fn(ListenerId, Option<ArcCowish>) -> EventResult<bool> + 'static {
443        match try_throw!(self.inner.events.write()).entry(event) {
444            Entry::Occupied(listeners_lock) => {
445                let mut listeners = try_throw!(listeners_lock.get().write());
446
447                let id = self.inner.counter.fetch_add(1, Ordering::Relaxed) as ListenerId;
448
449                listeners.push(SyncEventListener::new(id, Box::new(cb)));
450
451                Ok(id)
452            },
453            Entry::Vacant(vacant) => {
454                let mut listeners = Vec::with_capacity(1);
455
456                let id = self.inner.counter.fetch_add(1, Ordering::Relaxed) as ListenerId;
457
458                listeners.push(SyncEventListener::new(id, Box::new(cb)));
459
460                vacant.insert(Arc::new(RwLock::new(listeners)));
461
462                Ok(id)
463            }
464        }
465    }
466
467    /// Simple wrapper around `add_listener_impl` that automatically maps the result to `ran`
468    #[inline]
469    fn add_listener_impl_simple<F>(&mut self, event: K, cb: F) -> EventResult<ListenerId> where F: Fn(ListenerId, Option<ArcCowish>) -> EventResult<()> + 'static {
470        self.add_listener_impl(event, move |id, arg| cb(id, arg).map(internal::ran))
471    }
472
473    /// Internal function that takes care of removing the listener for `once` listener callbacks
474    fn once_impl<F>(&mut self, event: K, cb: F) -> EventResult<ListenerId> where F: Fn(ListenerId, Option<ArcCowish>) -> EventResult<()> + 'static {
475        // A weak reference is used so that the self-reference from with the listener table doesn't create a circular reference
476        let inner_weak = Arc::downgrade(&self.inner);
477
478        self.add_listener_impl(event.clone(), move |id, arg| -> EventResult<bool> {
479            let inner = inner_weak.upgrade().expect("Listener invoked after owning ParallelEventEmitter was dropped");
480
481            let mut events = try_throw!(inner.events.write());
482
483            // Perform the removal before the callback is invoked, so in case it panics or takes a long time to complete it will have already been removed.
484            match events.entry(event.clone()) {
485                Entry::Occupied(listeners_lock) => {
486                    let mut listeners = try_throw!(listeners_lock.get().write());
487
488                    if let Ok(index) = listeners.binary_search_by_key(&id, |listener| listener.id) {
489                        listeners.remove(index);
490                    } else {
491                        // If the listener has already been removed in the short time between emitting and this,
492                        // just forget we were here and return ok.
493                        return Ok(false);
494                    }
495                }
496                Entry::Vacant(_) => {
497                    // If the listener has already been removed in the short time between emitting and this,
498                    // just forget we were here and return ok.
499                    return Ok(false);
500                }
501            }
502
503            // Free the lock before invoking the callback
504            drop(events);
505
506            cb(id, arg).map(internal::ran)
507        })
508    }
509
510    /// Add a simple listener callback that does not accept any arguments
511    ///
512    /// The return value of this is a unique ID for that listener, which can later be used to remove it if desired.
513    #[inline]
514    pub fn add_listener<F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where F: Fn() -> EventResult<()> + 'static {
515        self.add_listener_impl_simple(event.into(), move |_, _| -> EventResult<()> { cb() })
516    }
517
518    /// Like `add_listener`, but the listener will be removed from the event emitter after a single invocation.
519    #[inline]
520    pub fn once<F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where F: Fn() -> EventResult<()> + 'static {
521        self.once_impl(event.into(), move |_, _| cb())
522    }
523
524    /// Add a listener that can accept a value passed via `emit_value`, or `emit_value_sync` if `T` is `Clone`
525    ///
526    /// If no value or an incompatible value was passed to `emit*`, `None` is passed.
527    ///
528    /// The return value of this is a unique ID for that listener, which can later be used to remove it if desired.
529    #[inline]
530    pub fn add_listener_value<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Clone + Send, F: Fn(Option<T>) -> EventResult<()> + 'static {
531        self.add_listener_impl_simple(event.into(), move |_, arg| internal::invoke_value_cb::<T, F>(arg, &cb))
532    }
533
534    /// Like `add_listener_value`, but the listener will be removed from the event emitter after a single invocation.
535    #[inline]
536    pub fn once_value<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Clone + Send, F: Fn(Option<T>) -> EventResult<()> + 'static {
537        self.once_impl(event.into(), move |_, arg| internal::invoke_value_cb::<T, F>(arg, &cb))
538    }
539
540    /// Variation of `add_listener_value` that accepts `Sync` types,
541    /// where intermediate copies on `emit*` are unnecessary.
542    ///
543    /// This will attempt to use a reference to the original
544    /// value passed to `emit_value_sync`. If a value of `T` was passed via `emit_value`,
545    /// the callback will be invoked with the `Clone`d copy.
546    ///
547    /// There is nothing statically forcing the use of this instead of `add_listener_value`,
548    /// but it is here just in case your type `T` is `Sync` but might not implement `Clone`,
549    /// or if you want to avoid cloning values all over the place.
550    ///
551    /// The return value of this is a unique ID for that listener, which can later be used to remove it if desired.
552    #[inline]
553    pub fn add_listener_sync<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Send + Sync, F: Fn(Option<&T>) -> EventResult<()> + 'static {
554        self.add_listener_impl_simple(event.into(), move |_, arg| internal::invoke_sync_cb::<T, F>(arg, &cb))
555    }
556
557    /// Like `add_listener_sync`, but the listener will be removed from the event emitter after a single invocation.
558    #[inline]
559    pub fn once_sync<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Send + Sync, F: Fn(Option<&T>) -> EventResult<()> + 'static {
560        self.once_impl(event.into(), move |_, arg| internal::invoke_sync_cb::<T, F>(arg, &cb))
561    }
562
563    /// Removes a listener with the given ID and associated with the given event.
564    ///
565    /// If the listener was not found (either doesn't exist or the wrong event given) `Ok(false)` is returned.
566    ///
567    /// If the listener was removed, `Ok(true)` is returned.
568    pub fn remove_listener<E: Into<K>>(&mut self, event: E, id: ListenerId) -> EventResult<bool> {
569        if let Some(listeners_lock) = try_throw!(self.inner.events.read()).get(&event.into()) {
570            let mut listeners = try_throw!(listeners_lock.write());
571
572            // Since ids only increase in value, and listeners are always added to the end of the vector,
573            // we can use a binary search for efficiency.
574            if let Ok(index) = listeners.binary_search_by_key(&id, |listener| listener.id) {
575                listeners.remove(index);
576
577                return Ok(true);
578            }
579        }
580
581        Ok(false)
582    }
583
584    /// Exhaustively searches through ALL events for a listener with the given ID.
585    ///
586    /// `Ok(false)` is returned if it was not found.
587    pub fn remove_any_listener(&mut self, id: ListenerId) -> EventResult<bool> {
588        for listeners_lock in try_throw!(self.inner.events.read()).values() {
589            let mut listeners = try_throw!(listeners_lock.write());
590
591            // Since ids only increase in value, and listeners are always added to the end of the vector,
592            // we can use a binary search for efficiency.
593            if let Ok(index) = listeners.binary_search_by_key(&id, |listener| listener.id) {
594                listeners.remove(index);
595
596                return Ok(true);
597            }
598        }
599
600        Ok(false)
601    }
602
603    /// Emit an event, invoking all the listeners for that event in the thread pool concurrently.
604    ///
605    /// The `Future` returned by `emit` resolves to the number of listeners invoked,
606    /// and any errors should be forwarded up.
607    #[cfg(feature = "conservative_impl_trait")]
608    pub fn emit<E: Into<K>>(&mut self, event: E) -> impl Future<Item = usize, Error = Trace<EventError>> {
609        let event = event.into();
610        let inner = self.inner.clone();
611
612        self.inner.pool.spawn_fn(move || internal::emit_spawn(inner, event)).flatten()
613    }
614
615    /// Emit an event, invoking all the listeners for that event in the thread pool concurrently.
616    ///
617    /// The `Future` returned by `emit` resolves to the number of listeners invoked,
618    /// and any errors should be forwarded up.
619    #[cfg(not(feature = "conservative_impl_trait"))]
620    pub fn emit<E: Into<K>>(&mut self, event: E) -> BoxFuture<usize, Trace<EventError>> {
621        let event = event.into();
622        let inner = self.inner.clone();
623
624        self.inner.pool.spawn_fn(move || internal::emit_spawn(inner, event)).flatten().boxed()
625    }
626
627    /// Emit an event, invoking all the listeners for that event in the thread pool concurrently.
628    ///
629    /// A copy of the value will be passed to every listener.
630    ///
631    /// The `Future` returned by `emit_value` resolves to the number of listeners invoked,
632    /// and any errors should be forwarded up.
633    #[cfg(feature = "conservative_impl_trait")]
634    pub fn emit_value<T, E: Into<K>>(&mut self, event: E, value: T) -> impl Future<Item = usize, Error = Trace<EventError>> where T: Any + Clone + Send {
635        let event = event.into();
636        let inner = self.inner.clone();
637
638        self.inner.pool.spawn_fn(move || internal::emit_value_spawn(inner, event, value)).flatten()
639    }
640
641    /// Emit an event, invoking all the listeners for that event in the thread pool concurrently.
642    ///
643    /// A copy of the value will be passed to every listener.
644    ///
645    /// The `Future` returned by `emit_value` resolves to the number of listeners invoked,
646    /// and any errors should be forwarded up.
647    #[cfg(not(feature = "conservative_impl_trait"))]
648    pub fn emit_value<T, E: Into<K>>(&mut self, event: E, value: T) -> BoxFuture<usize, Trace<EventError>> where T: Any + Clone + Send {
649        let event = event.into();
650        let inner = self.inner.clone();
651
652        self.inner.pool.spawn_fn(move || internal::emit_value_spawn(inner, event, value)).flatten().boxed()
653    }
654
655    /// Variation of `emit_value` for `Sync` types, where intermediate copies are unnecessary.
656    ///
657    /// All listeners receive a reference to the same value.
658    ///
659    /// The `Future` returned by `emit_value_sync` resolves to the number of listeners invoked,
660    /// and any errors should be forwarded up.
661    #[cfg(feature = "conservative_impl_trait")]
662    pub fn emit_value_sync<T, E: Into<K>>(&mut self, event: E, value: T) -> impl Future<Item = usize, Error = Trace<EventError>> where T: Any + Send + Sync {
663        let event = event.into();
664        let inner = self.inner.clone();
665
666        self.inner.pool.spawn_fn(move || internal::emit_value_sync_spawn(inner, event, value)).flatten()
667    }
668
669    /// Variation of `emit_value` for `Sync` types, where intermediate copies are unnecessary.
670    ///
671    /// All listeners receive a reference to the same value.
672    ///
673    /// The `Future` returned by `emit_value_sync` resolves to the number of listeners invoked,
674    /// and any errors should be forwarded up.
675    #[cfg(not(feature = "conservative_impl_trait"))]
676    pub fn emit_value_sync<T, E: Into<K>>(&mut self, event: E, value: T) -> BoxFuture<usize, Trace<EventError>> where T: Any + Send + Sync {
677        let event = event.into();
678        let inner = self.inner.clone();
679
680        self.inner.pool.spawn_fn(move || internal::emit_value_sync_spawn(inner, event, value)).flatten().boxed()
681    }
682}