rxrust/observable/
base.rs1use crate::prelude::*;
2
3pub trait Emitter {
4 type Item;
5 type Err;
6}
7
8pub trait LocalEmitter<'a>: Emitter {
9 fn emit<O>(self, subscriber: Subscriber<O, LocalSubscription>)
10 where
11 O: Observer<Item = Self::Item, Err = Self::Err> + 'a;
12}
13
14#[derive(Clone)]
15pub struct ObservableBase<Emit>(Emit);
16
17impl<Emit> ObservableBase<Emit> {
18 pub fn new(emitter: Emit) -> Self { ObservableBase(emitter) }
19}
20
21#[doc(hidden)]
22macro_rules! observable_impl {
23 ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
24 fn actual_subscribe<O>(
25 self,
26 subscriber: Subscriber<O, $subscription>,
27 ) -> Self::Unsub
28 where O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf {
29 let subscription = subscriber.subscription.clone();
30 self.0.emit(subscriber);
31 subscription
32 }
33}
34}
35
36impl<Emit> Observable for ObservableBase<Emit>
37where
38 Emit: Emitter,
39{
40 type Item = Emit::Item;
41 type Err = Emit::Err;
42}
43
44impl<'a, Emit> LocalObservable<'a> for ObservableBase<Emit>
45where
46 Emit: LocalEmitter<'a>,
47{
48 type Unsub = LocalSubscription;
49 observable_impl!(LocalSubscription, 'a);
50}
51
52impl<Emit> SharedObservable for ObservableBase<Emit>
53where
54 Emit: SharedEmitter,
55{
56 type Unsub = SharedSubscription;
57 observable_impl!(SharedSubscription, Send + Sync + 'static);
58}