rxrust/observable/
base.rs

1use crate::prelude::*;
2
3pub trait Emitter {
4  type Item;
5  type Err;
6}
7
8pub trait LocalEmitter<'a>: Emitter {
9  fn emit<O>(self, subscriber: Subscriber<O, LocalSubscription>)
10  where
11    O: Observer<Item = Self::Item, Err = Self::Err> + 'a;
12}
13
14#[derive(Clone)]
15pub struct ObservableBase<Emit>(Emit);
16
17impl<Emit> ObservableBase<Emit> {
18  pub fn new(emitter: Emit) -> Self { ObservableBase(emitter) }
19}
20
21#[doc(hidden)]
22macro_rules! observable_impl {
23    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
24  fn actual_subscribe<O>(
25    self,
26    subscriber: Subscriber<O, $subscription>,
27  ) -> Self::Unsub
28  where O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf {
29    let subscription = subscriber.subscription.clone();
30    self.0.emit(subscriber);
31    subscription
32  }
33}
34}
35
36impl<Emit> Observable for ObservableBase<Emit>
37where
38  Emit: Emitter,
39{
40  type Item = Emit::Item;
41  type Err = Emit::Err;
42}
43
44impl<'a, Emit> LocalObservable<'a> for ObservableBase<Emit>
45where
46  Emit: LocalEmitter<'a>,
47{
48  type Unsub = LocalSubscription;
49  observable_impl!(LocalSubscription, 'a);
50}
51
52impl<Emit> SharedObservable for ObservableBase<Emit>
53where
54  Emit: SharedEmitter,
55{
56  type Unsub = SharedSubscription;
57  observable_impl!(SharedSubscription, Send + Sync + 'static);
58}