1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
extern crate futures; extern crate tokio; use futures::prelude::*; use crate::spi::{Publisher, Subscriber}; use std::marker::PhantomData; use std::thread; const THREAD_NAME: &str = "rx"; pub trait Scheduler { type Item; type Error; fn schedule<P, S>(&self, publisher: P, subscriber: S) where Self: Sized, P: 'static + Send + Sized + Publisher<Item = Self::Item, Error = Self::Error>, S: 'static + Send + Sized + Subscriber<Item = Self::Item, Error = Self::Error>; } pub fn immediate<T, E>() -> impl Scheduler<Item = T, Error = E> { ImmediateScheduler::new() } pub fn new_thread<T, E>() -> impl Scheduler<Item = T, Error = E> where T: 'static, E: 'static, { NewThreadScheduler::new() } pub fn tokio<T, E>() -> impl Scheduler<Item = T, Error = E> { TkScheduler::new() } struct ImmediateScheduler<T, E> { _t: PhantomData<T>, _e: PhantomData<E>, } impl<T, E> ImmediateScheduler<T, E> { pub(crate) fn new() -> ImmediateScheduler<T, E> { ImmediateScheduler { _t: PhantomData, _e: PhantomData, } } } impl<T, E> Scheduler for ImmediateScheduler<T, E> { type Item = T; type Error = E; fn schedule<P, S>(&self, publisher: P, subscriber: S) where P: 'static + Send + Sized + Publisher<Item = Self::Item, Error = Self::Error>, S: 'static + Send + Sized + Subscriber<Item = T, Error = E>, { publisher.subscribe(subscriber); } } struct TkScheduler<T, E> { _t: PhantomData<T>, _e: PhantomData<E>, } impl<T, E> TkScheduler<T, E> { fn new() -> TkScheduler<T, E> { TkScheduler { _t: PhantomData, _e: PhantomData, } } } impl<T, E> Scheduler for TkScheduler<T, E> { type Item = T; type Error = E; fn schedule<P, S>(&self, publisher: P, subscriber: S) where P: 'static + Send + Sized + Publisher<Item = T, Error = E>, S: 'static + Send + Sized + Subscriber<Item = T, Error = E>, { let fu = futures::future::ok::<(P, S), ()>((publisher, subscriber)).and_then(move |(p, s)| { p.subscribe(s); Ok(()) }); tokio::spawn(fu); } } struct NewThreadScheduler<T, E> { _t: PhantomData<T>, _e: PhantomData<E>, } impl<T, E> NewThreadScheduler<T, E> { pub(crate) fn new() -> NewThreadScheduler<T, E> { NewThreadScheduler { _t: PhantomData, _e: PhantomData, } } } impl<T, E> Scheduler for NewThreadScheduler<T, E> { type Item = T; type Error = E; fn schedule<P, S>(&self, publisher: P, subscriber: S) where P: 'static + Send + Sized + Publisher<Item = Self::Item, Error = Self::Error>, S: 'static + Send + Sized + Subscriber<Item = T, Error = E>, { thread::Builder::new() .name(String::from(THREAD_NAME)) .spawn(move || { publisher.subscribe(subscriber); }) .unwrap(); } }