parallel_event_emitter/lib.rs
1//! Parallel Event Emitter
2//!
3//! Implementation of an event emitter that invokes event listener callbacks concurrently in a configurable thread pool,
4//! using `Future`s to notify callers of success or errors.
5//!
6//! Because all values must be transferred across thread boundaries, all types `T` must be `Send`.
7//!
8//! Additionally, all types `T` must be `Any`, so `T: 'static`.
9//!
10//! ## Usage
11//!
12//! ```toml
13//! [dependencies]
14//! futures = "0.1"
15//! parallel-event-emitter = "0.2.4"
16//! ```
17//!
18//! Example using a `String` as the key:
19//!
20//! ```rust
21//! extern crate futures;
22//! extern crate parallel_event_emitter;
23//!
24//! use futures::Future;
25//! use parallel_event_emitter::*;
26//!
27//! fn main() {
28//! let mut emitter: ParallelEventEmitter<String> = ParallelEventEmitter::new();
29//!
30//! emitter.add_listener("some event", || {
31//! println!("Hello, World!");
32//!
33//! Ok(())
34//! }).unwrap();
35//!
36//! assert_eq!(1, emitter.emit("some event").wait().unwrap());
37//! }
38//! ```
39//!
40//! Or using a custom event type:
41//!
42//! ```rust
43//! extern crate futures;
44//! extern crate parallel_event_emitter;
45//!
46//! use futures::Future;
47//! use parallel_event_emitter::*;
48//!
49//! #[derive(Debug, Hash, PartialEq, Eq, Clone)]
50//! enum MyEvents {
51//! EventA,
52//! EventB,
53//! }
54//!
55//! fn main() {
56//! let mut emitter: ParallelEventEmitter<MyEvents> = ParallelEventEmitter::new();
57//!
58//! emitter.add_listener(MyEvents::EventA, || {
59//! println!("Hello, World!");
60//!
61//! Ok(())
62//! }).unwrap();
63//!
64//! assert_eq!(1, emitter.emit(MyEvents::EventA).wait().unwrap());
65//! }
66//! ```
67//!
68//! ## `Trace<E>` type
69//!
70//! This crate depends on the [`trace-error`](https://crates.io/crates/trace-error) crate to have simple and lightweight backtraces on all error `Result`s.
71//!
72//! If you choose not to use that, which is fine by me, simply call `.into_error()` on all `Trace<E>` values to get the real error.
73//!
74//! ## `impl Trait` feature
75//!
76//! Instead of having all the `emit*` methods returning a boxed `Future` (`BoxFuture`),
77//! the Cargo feature **`conservative_impl_trait`** can be given to enable `impl Future` return types on
78//! all the `emit*` methods.
79//!
80//! ```toml
81//! [dependencies.parallel-event-emitter]
82//! version = "0.2.4"
83//! features = ["default", "conservative_impl_trait"] # And maybe integer_atomics
84//! ```
85//!
86//! ## Larger `ListenerId`s
87//!
88//! Although the `ListenerId` type itself is `u64`,
89//! the atomic counter underneath is restricted to `AtomicUsize` by default.
90//!
91//! To enable true guaranteed 64-bit counters, use the `integer_atomics` feature for the crate.
92//!
93//! ```toml
94//! [dependencies.parallel-event-emitter]
95//! version = "0.2.4"
96//! features = ["default", "integer_atomics"] # And maybe conservative_impl_trait
97//! ```
98//!
99
100#![deny(missing_docs)]
101#![allow(unknown_lints)]
102
103#![cfg_attr(feature = "integer_atomics", feature(integer_atomics))]
104#![cfg_attr(feature = "conservative_impl_trait", feature(conservative_impl_trait))]
105
106extern crate fnv;
107
108#[macro_use]
109extern crate trace_error;
110extern crate futures;
111extern crate futures_cpupool;
112
113use std::any::Any;
114use std::sync::{Arc, RwLock};
115
116use std::sync::atomic::Ordering;
117
118use std::hash::Hash;
119use fnv::FnvHashMap;
120use std::collections::hash_map::Entry;
121
122use futures::Future;
123use futures_cpupool::CpuPool;
124
125// BoxFuture is unused if we use impl Trait instead
126#[cfg(not(feature = "conservative_impl_trait"))]
127use futures::BoxFuture;
128
129use trace_error::Trace;
130
131pub mod error;
132
133pub use self::error::*;
134
135/// Types that can be used as event identifiers
136///
137/// Example using an enum:
138///
139/// ```ignore
140/// #[derive(Debug, Hash, PartialEq, Eq, Clone)]
141/// enum MyEvents {
142/// EventA,
143/// EventB,
144/// }
145/// ```
146pub trait EventKey: Hash + PartialEq + Eq + Clone + Send + 'static {}
147
148impl<T> EventKey for T where T: Hash + PartialEq + Eq + Clone + Send + 'static {}
149
150/// The internal module defines non-public functionality.
151///
152/// By putting most of it in its own module it helps with organization.
153mod internal {
154 use std::any::Any;
155 use std::sync::{Arc, RwLock, Mutex};
156
157 use trace_error::Trace;
158
159 use fnv::FnvHashMap;
160
161 use futures::{self, future, Future, BoxFuture};
162 use futures_cpupool::CpuPool;
163
164 #[cfg(feature = "integer_atomics")]
165 pub use std::sync::atomic::AtomicU64 as AtomicListenerId;
166
167 #[cfg(not(feature = "integer_atomics"))]
168 pub use std::sync::atomic::AtomicUsize as AtomicListenerId;
169
170 use super::{ListenerId, EventError, EventResult, EventKey};
171
172 /// Helper function to map listener results with
173 #[allow(inline_always)]
174 #[inline(always)]
175 pub fn ran(_: ()) -> bool {
176 true
177 }
178
179 /// Helper function to count successfully ran listeners
180 pub fn count_ran(executed: Vec<bool>) -> usize {
181 executed.into_iter().filter(|ran| *ran).count()
182 }
183
184 /// This allows callbacks to take both values and references depending on the situation
185 pub enum ArcCowish {
186 /// Owned value that can be moved out
187 Owned(Box<Any + Send>),
188 /// Borrowed value that must be passed by reference
189 Borrowed(Arc<Box<Any + Send>>),
190 }
191
192 unsafe impl Send for ArcCowish {}
193
194 /// This handler callback takes the listener id and the argument,
195 /// and returns `Ok(true)` if the listener callback was invoked correctly.
196 pub type SyncCallback = Box<FnMut(ListenerId, Option<ArcCowish>) -> EventResult<bool>>;
197
198 /// Stores the listener callback and its ID value
199 ///
200 /// The odd order of locks and `Arc`s is due to needing to access the `id` field without locking
201 pub struct SyncEventListener {
202 /// Immutable unique ID of the listener
203 pub id: ListenerId,
204 /// Lockable callback. It must have a lock because as an `Fn`
205 pub cb: Mutex<SyncCallback>,
206 }
207
208 unsafe impl Send for SyncEventListener {}
209
210 unsafe impl Sync for SyncEventListener {}
211
212 impl SyncEventListener {
213 /// Create a new `SyncEventListener` from an id and callback
214 pub fn new(id: ListenerId, cb: SyncCallback) -> Arc<SyncEventListener> {
215 Arc::new(SyncEventListener { id: id, cb: Mutex::new(cb) })
216 }
217 }
218
219 /// Reference counted `SyncEventListener`
220 pub type SyncEventListenerLock = Arc<SyncEventListener>;
221
222 /// Reference counted and lockable listener vector
223 pub type SyncListenersLock = Arc<RwLock<Vec<SyncEventListenerLock>>>;
224
225 /// Inner structure that can be referenced from within the threadpool and listener callbacks
226 pub struct Inner<K: EventKey> {
227 /// Event table
228 pub events: RwLock<FnvHashMap<K, SyncListenersLock>>,
229 /// Atomic counter that always increments
230 pub counter: AtomicListenerId,
231 /// Threadpool to dispatch listener callbacks in
232 pub pool: CpuPool,
233 }
234
235 unsafe impl<K: EventKey> Send for Inner<K> {}
236
237 unsafe impl<K: EventKey> Sync for Inner<K> {}
238
239 /// Internal function used in callbacks in `add_listener_value` and `once_value`
240 pub fn invoke_value_cb<T, F>(arg: Option<ArcCowish>, cb: &F) -> EventResult<()> where T: Any + Clone + Send, F: Fn(Option<T>) -> EventResult<()> + 'static {
241 if let Some(arg) = arg {
242 match arg {
243 ArcCowish::Borrowed(value) => {
244 // If the value is borrowed, but T is Clone, we can clone a unique value
245 if let Some(value) = value.downcast_ref::<T>() {
246 return cb(Some(value.clone()));
247 }
248 }
249 ArcCowish::Owned(value) => {
250 // If it's owned, we just dereference it directly.
251 if let Ok(value) = value.downcast::<T>() {
252 return cb(Some(*value));
253 }
254 }
255 }
256 }
257
258 cb(None)
259 }
260
261 /// Internal function used in callbacks in `add_listener_sync` and `once_sync`
262 pub fn invoke_sync_cb<T, F>(arg: Option<ArcCowish>, cb: &F) -> EventResult<()> where T: Any + Send + Sync, F: Fn(Option<&T>) -> EventResult<()> + 'static {
263 if let Some(arg) = arg {
264 match arg {
265 ArcCowish::Borrowed(value) => {
266 // If the value is borrowed, use the reference to the original copy
267 if let Some(value) = value.downcast_ref::<T>() {
268 return cb(Some(&*value));
269 }
270 }
271 ArcCowish::Owned(value) => {
272 // If it's owned, do the same, but just use a reference to the local copy
273 if let Some(value) = value.downcast_ref::<T>() {
274 return cb(Some(&*value));
275 }
276 }
277 }
278 }
279
280 cb(None)
281 }
282
283 /// Internal function to facilitate in spawning the listener callback tasks for `emit`
284 pub fn emit_spawn<K: EventKey>(inner: Arc<Inner<K>>, event: K) -> EventResult<BoxFuture<usize, Trace<EventError>>> {
285 if let Some(listeners_lock) = try_throw!(inner.events.read()).get(&event) {
286 let listeners = try_throw!(listeners_lock.read());
287
288 // Don't bother if there aren't any listeners to invoke anyway
289 if listeners.len() > 0 {
290 let mut listener_futures = Vec::with_capacity(listeners.len());
291
292 for listener in listeners.iter().cloned() {
293 let listener_future = inner.pool.spawn_fn(move || -> EventResult<bool> {
294 let mut cb_guard = try_throw!(listener.cb.lock());
295
296 // Force a mutable reference to the callback
297 (&mut *cb_guard)(listener.id, None)
298 });
299
300 listener_futures.push(listener_future);
301 }
302
303 return Ok(future::join_all(listener_futures).map(count_ran).boxed());
304 }
305 }
306
307 Ok(futures::finished(0).boxed())
308 }
309
310 /// Internal function to facilitate in spawning the listener callback tasks for `emit_value`
311 pub fn emit_value_spawn<T, K: EventKey>(inner: Arc<Inner<K>>, event: K, value: T) -> EventResult<BoxFuture<usize, Trace<EventError>>> where T: Any + Clone + Send {
312 if let Some(listeners_lock) = try_throw!(inner.events.read()).get(&event) {
313 let listeners = try_throw!(listeners_lock.read());
314
315 // Don't bother if there aren't any listeners to invoke anyway
316 if listeners.len() > 0 {
317 let mut listener_futures = Vec::with_capacity(listeners.len());
318
319 for listener in listeners.iter().cloned() {
320 // Clone a local copy of value that can be sent to the listener
321 let value = value.clone();
322
323 let listener_future = inner.pool.spawn_fn(move || -> EventResult<bool> {
324 let mut cb_guard = try_throw!(listener.cb.lock());
325
326 // Force a mutable reference to the callback
327 (&mut *cb_guard)(listener.id, Some(ArcCowish::Owned(Box::new(value))))
328 });
329
330 listener_futures.push(listener_future);
331 }
332
333 return Ok(future::join_all(listener_futures).map(count_ran).boxed());
334 }
335 }
336
337 Ok(futures::finished(0).boxed())
338 }
339
340 /// Internal function to facilitate in spawning the listener callback tasks for `emit_value_sync`
341 pub fn emit_value_sync_spawn<T, K: EventKey>(inner: Arc<Inner<K>>, event: K, value: T) -> EventResult<BoxFuture<usize, Trace<EventError>>> where T: Any + Send + Sync {
342 if let Some(listeners_lock) = try_throw!(inner.events.read()).get(&event) {
343 let listeners = try_throw!(listeners_lock.read());
344
345 // Don't bother if there aren't any listeners to invoke anyway
346 if listeners.len() > 0 {
347 let mut listener_futures = Vec::with_capacity(listeners.len());
348
349 // We know T is Send, and Box<Any + Send> is really just Box<T>, so it is Send as well
350 #[derive(Clone)]
351 struct SendWrapper {
352 inner: Arc<Box<Any + Send>>
353 }
354
355 unsafe impl Send for SendWrapper {}
356
357 let wrapper = SendWrapper { inner: Arc::new(Box::new(value)) };
358
359 for listener in listeners.iter().cloned() {
360 // Clone the wrapper to send across the thread boundary
361 let wrapper = wrapper.clone();
362
363 let listener_future = inner.pool.spawn_fn(move || -> EventResult<bool> {
364 let mut cb_guard = try_throw!(listener.cb.lock());
365
366 // Force a mutable reference to the callback
367 (&mut *cb_guard)(listener.id, Some(ArcCowish::Borrowed(wrapper.inner)))
368 });
369
370 listener_futures.push(listener_future);
371 }
372
373 return Ok(future::join_all(listener_futures).map(count_ran).boxed());
374 }
375 }
376
377 Ok(futures::finished(0).boxed())
378 }
379}
380
381use internal::{Inner, ArcCowish, SyncEventListener};
382
383/// Integer type used to represent unique listener ids
384pub type ListenerId = u64;
385
386/// Parallel Event Emitter
387///
388/// Listeners added to the emitter will be invoked in a thread pool concurrently.
389pub struct ParallelEventEmitter<K: EventKey = String> {
390 inner: Arc<internal::Inner<K>>,
391}
392
393impl<K: EventKey> Default for ParallelEventEmitter<K> {
394 fn default() -> ParallelEventEmitter<K> {
395 ParallelEventEmitter::new()
396 }
397}
398
399impl<K: EventKey> ParallelEventEmitter<K> {
400 /// Creates a new `ParallelEventEmitter` with the default `CpuPool`
401 pub fn new() -> ParallelEventEmitter<K> {
402 ParallelEventEmitter::with_pool(CpuPool::new_num_cpus())
403 }
404
405 /// Creates a new `ParallelEventEmitter` with an already existing `CpuPool` instance.
406 ///
407 /// This allows for custom thread preferences and lifecycle hooks.
408 pub fn with_pool(pool: CpuPool) -> ParallelEventEmitter<K> {
409 ParallelEventEmitter {
410 inner: Arc::new(Inner {
411 events: RwLock::new(FnvHashMap::default()),
412 counter: internal::AtomicListenerId::new(0),
413 pool: pool,
414 })
415 }
416 }
417
418 /// Collect the names of all events being listened for.
419 ///
420 /// Unfortunately, this method locks a mutex on an internal structure,
421 /// so an iterator cannot be returned.
422 pub fn event_names(&self) -> EventResult<Vec<K>> {
423 let guard = try_throw!(self.inner.events.read());
424
425 Ok(guard.keys().cloned().collect())
426 }
427
428 /// As an alternative to cloning all the event names and collecting them into a `Vec`,
429 /// like in `event_names`,
430 /// a visitor callback can be used to iterate all the event names more efficiently.
431 pub fn event_names_visitor<F>(&self, visitor: F) -> EventResult<()> where F: Fn(&K) {
432 let guard = try_throw!(self.inner.events.read());
433
434 for key in guard.keys() {
435 visitor(key);
436 }
437
438 Ok(())
439 }
440
441 /// Internal function that takes care of the listener vectors, generating new listener ids and inserting the new listener
442 fn add_listener_impl<F>(&mut self, event: K, cb: F) -> EventResult<ListenerId> where F: Fn(ListenerId, Option<ArcCowish>) -> EventResult<bool> + 'static {
443 match try_throw!(self.inner.events.write()).entry(event) {
444 Entry::Occupied(listeners_lock) => {
445 let mut listeners = try_throw!(listeners_lock.get().write());
446
447 let id = self.inner.counter.fetch_add(1, Ordering::Relaxed) as ListenerId;
448
449 listeners.push(SyncEventListener::new(id, Box::new(cb)));
450
451 Ok(id)
452 },
453 Entry::Vacant(vacant) => {
454 let mut listeners = Vec::with_capacity(1);
455
456 let id = self.inner.counter.fetch_add(1, Ordering::Relaxed) as ListenerId;
457
458 listeners.push(SyncEventListener::new(id, Box::new(cb)));
459
460 vacant.insert(Arc::new(RwLock::new(listeners)));
461
462 Ok(id)
463 }
464 }
465 }
466
467 /// Simple wrapper around `add_listener_impl` that automatically maps the result to `ran`
468 #[inline]
469 fn add_listener_impl_simple<F>(&mut self, event: K, cb: F) -> EventResult<ListenerId> where F: Fn(ListenerId, Option<ArcCowish>) -> EventResult<()> + 'static {
470 self.add_listener_impl(event, move |id, arg| cb(id, arg).map(internal::ran))
471 }
472
473 /// Internal function that takes care of removing the listener for `once` listener callbacks
474 fn once_impl<F>(&mut self, event: K, cb: F) -> EventResult<ListenerId> where F: Fn(ListenerId, Option<ArcCowish>) -> EventResult<()> + 'static {
475 // A weak reference is used so that the self-reference from with the listener table doesn't create a circular reference
476 let inner_weak = Arc::downgrade(&self.inner);
477
478 self.add_listener_impl(event.clone(), move |id, arg| -> EventResult<bool> {
479 let inner = inner_weak.upgrade().expect("Listener invoked after owning ParallelEventEmitter was dropped");
480
481 let mut events = try_throw!(inner.events.write());
482
483 // Perform the removal before the callback is invoked, so in case it panics or takes a long time to complete it will have already been removed.
484 match events.entry(event.clone()) {
485 Entry::Occupied(listeners_lock) => {
486 let mut listeners = try_throw!(listeners_lock.get().write());
487
488 if let Ok(index) = listeners.binary_search_by_key(&id, |listener| listener.id) {
489 listeners.remove(index);
490 } else {
491 // If the listener has already been removed in the short time between emitting and this,
492 // just forget we were here and return ok.
493 return Ok(false);
494 }
495 }
496 Entry::Vacant(_) => {
497 // If the listener has already been removed in the short time between emitting and this,
498 // just forget we were here and return ok.
499 return Ok(false);
500 }
501 }
502
503 // Free the lock before invoking the callback
504 drop(events);
505
506 cb(id, arg).map(internal::ran)
507 })
508 }
509
510 /// Add a simple listener callback that does not accept any arguments
511 ///
512 /// The return value of this is a unique ID for that listener, which can later be used to remove it if desired.
513 #[inline]
514 pub fn add_listener<F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where F: Fn() -> EventResult<()> + 'static {
515 self.add_listener_impl_simple(event.into(), move |_, _| -> EventResult<()> { cb() })
516 }
517
518 /// Like `add_listener`, but the listener will be removed from the event emitter after a single invocation.
519 #[inline]
520 pub fn once<F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where F: Fn() -> EventResult<()> + 'static {
521 self.once_impl(event.into(), move |_, _| cb())
522 }
523
524 /// Add a listener that can accept a value passed via `emit_value`, or `emit_value_sync` if `T` is `Clone`
525 ///
526 /// If no value or an incompatible value was passed to `emit*`, `None` is passed.
527 ///
528 /// The return value of this is a unique ID for that listener, which can later be used to remove it if desired.
529 #[inline]
530 pub fn add_listener_value<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Clone + Send, F: Fn(Option<T>) -> EventResult<()> + 'static {
531 self.add_listener_impl_simple(event.into(), move |_, arg| internal::invoke_value_cb::<T, F>(arg, &cb))
532 }
533
534 /// Like `add_listener_value`, but the listener will be removed from the event emitter after a single invocation.
535 #[inline]
536 pub fn once_value<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Clone + Send, F: Fn(Option<T>) -> EventResult<()> + 'static {
537 self.once_impl(event.into(), move |_, arg| internal::invoke_value_cb::<T, F>(arg, &cb))
538 }
539
540 /// Variation of `add_listener_value` that accepts `Sync` types,
541 /// where intermediate copies on `emit*` are unnecessary.
542 ///
543 /// This will attempt to use a reference to the original
544 /// value passed to `emit_value_sync`. If a value of `T` was passed via `emit_value`,
545 /// the callback will be invoked with the `Clone`d copy.
546 ///
547 /// There is nothing statically forcing the use of this instead of `add_listener_value`,
548 /// but it is here just in case your type `T` is `Sync` but might not implement `Clone`,
549 /// or if you want to avoid cloning values all over the place.
550 ///
551 /// The return value of this is a unique ID for that listener, which can later be used to remove it if desired.
552 #[inline]
553 pub fn add_listener_sync<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Send + Sync, F: Fn(Option<&T>) -> EventResult<()> + 'static {
554 self.add_listener_impl_simple(event.into(), move |_, arg| internal::invoke_sync_cb::<T, F>(arg, &cb))
555 }
556
557 /// Like `add_listener_sync`, but the listener will be removed from the event emitter after a single invocation.
558 #[inline]
559 pub fn once_sync<T, F, E: Into<K>>(&mut self, event: E, cb: F) -> EventResult<ListenerId> where T: Any + Send + Sync, F: Fn(Option<&T>) -> EventResult<()> + 'static {
560 self.once_impl(event.into(), move |_, arg| internal::invoke_sync_cb::<T, F>(arg, &cb))
561 }
562
563 /// Removes a listener with the given ID and associated with the given event.
564 ///
565 /// If the listener was not found (either doesn't exist or the wrong event given) `Ok(false)` is returned.
566 ///
567 /// If the listener was removed, `Ok(true)` is returned.
568 pub fn remove_listener<E: Into<K>>(&mut self, event: E, id: ListenerId) -> EventResult<bool> {
569 if let Some(listeners_lock) = try_throw!(self.inner.events.read()).get(&event.into()) {
570 let mut listeners = try_throw!(listeners_lock.write());
571
572 // Since ids only increase in value, and listeners are always added to the end of the vector,
573 // we can use a binary search for efficiency.
574 if let Ok(index) = listeners.binary_search_by_key(&id, |listener| listener.id) {
575 listeners.remove(index);
576
577 return Ok(true);
578 }
579 }
580
581 Ok(false)
582 }
583
584 /// Exhaustively searches through ALL events for a listener with the given ID.
585 ///
586 /// `Ok(false)` is returned if it was not found.
587 pub fn remove_any_listener(&mut self, id: ListenerId) -> EventResult<bool> {
588 for listeners_lock in try_throw!(self.inner.events.read()).values() {
589 let mut listeners = try_throw!(listeners_lock.write());
590
591 // Since ids only increase in value, and listeners are always added to the end of the vector,
592 // we can use a binary search for efficiency.
593 if let Ok(index) = listeners.binary_search_by_key(&id, |listener| listener.id) {
594 listeners.remove(index);
595
596 return Ok(true);
597 }
598 }
599
600 Ok(false)
601 }
602
603 /// Emit an event, invoking all the listeners for that event in the thread pool concurrently.
604 ///
605 /// The `Future` returned by `emit` resolves to the number of listeners invoked,
606 /// and any errors should be forwarded up.
607 #[cfg(feature = "conservative_impl_trait")]
608 pub fn emit<E: Into<K>>(&mut self, event: E) -> impl Future<Item = usize, Error = Trace<EventError>> {
609 let event = event.into();
610 let inner = self.inner.clone();
611
612 self.inner.pool.spawn_fn(move || internal::emit_spawn(inner, event)).flatten()
613 }
614
615 /// Emit an event, invoking all the listeners for that event in the thread pool concurrently.
616 ///
617 /// The `Future` returned by `emit` resolves to the number of listeners invoked,
618 /// and any errors should be forwarded up.
619 #[cfg(not(feature = "conservative_impl_trait"))]
620 pub fn emit<E: Into<K>>(&mut self, event: E) -> BoxFuture<usize, Trace<EventError>> {
621 let event = event.into();
622 let inner = self.inner.clone();
623
624 self.inner.pool.spawn_fn(move || internal::emit_spawn(inner, event)).flatten().boxed()
625 }
626
627 /// Emit an event, invoking all the listeners for that event in the thread pool concurrently.
628 ///
629 /// A copy of the value will be passed to every listener.
630 ///
631 /// The `Future` returned by `emit_value` resolves to the number of listeners invoked,
632 /// and any errors should be forwarded up.
633 #[cfg(feature = "conservative_impl_trait")]
634 pub fn emit_value<T, E: Into<K>>(&mut self, event: E, value: T) -> impl Future<Item = usize, Error = Trace<EventError>> where T: Any + Clone + Send {
635 let event = event.into();
636 let inner = self.inner.clone();
637
638 self.inner.pool.spawn_fn(move || internal::emit_value_spawn(inner, event, value)).flatten()
639 }
640
641 /// Emit an event, invoking all the listeners for that event in the thread pool concurrently.
642 ///
643 /// A copy of the value will be passed to every listener.
644 ///
645 /// The `Future` returned by `emit_value` resolves to the number of listeners invoked,
646 /// and any errors should be forwarded up.
647 #[cfg(not(feature = "conservative_impl_trait"))]
648 pub fn emit_value<T, E: Into<K>>(&mut self, event: E, value: T) -> BoxFuture<usize, Trace<EventError>> where T: Any + Clone + Send {
649 let event = event.into();
650 let inner = self.inner.clone();
651
652 self.inner.pool.spawn_fn(move || internal::emit_value_spawn(inner, event, value)).flatten().boxed()
653 }
654
655 /// Variation of `emit_value` for `Sync` types, where intermediate copies are unnecessary.
656 ///
657 /// All listeners receive a reference to the same value.
658 ///
659 /// The `Future` returned by `emit_value_sync` resolves to the number of listeners invoked,
660 /// and any errors should be forwarded up.
661 #[cfg(feature = "conservative_impl_trait")]
662 pub fn emit_value_sync<T, E: Into<K>>(&mut self, event: E, value: T) -> impl Future<Item = usize, Error = Trace<EventError>> where T: Any + Send + Sync {
663 let event = event.into();
664 let inner = self.inner.clone();
665
666 self.inner.pool.spawn_fn(move || internal::emit_value_sync_spawn(inner, event, value)).flatten()
667 }
668
669 /// Variation of `emit_value` for `Sync` types, where intermediate copies are unnecessary.
670 ///
671 /// All listeners receive a reference to the same value.
672 ///
673 /// The `Future` returned by `emit_value_sync` resolves to the number of listeners invoked,
674 /// and any errors should be forwarded up.
675 #[cfg(not(feature = "conservative_impl_trait"))]
676 pub fn emit_value_sync<T, E: Into<K>>(&mut self, event: E, value: T) -> BoxFuture<usize, Trace<EventError>> where T: Any + Send + Sync {
677 let event = event.into();
678 let inner = self.inner.clone();
679
680 self.inner.pool.spawn_fn(move || internal::emit_value_sync_spawn(inner, event, value)).flatten().boxed()
681 }
682}