1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
use crate::prelude::*;

/// Shared wrap the Observable, subscribe and accept subscribe in a safe mode
/// by SharedObservable.
#[derive(Clone)]
pub struct Shared<R>(pub(crate) R);

pub trait SharedObservable: Observable {
  type Unsub: SubscriptionLike + 'static;
  fn actual_subscribe<
    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
  >(
    self,
    subscriber: Subscriber<O, SharedSubscription>,
  ) -> Self::Unsub;

  /// Convert to a thread-safe mode.
  #[inline]
  fn into_shared(self) -> Shared<Self>
  where
    Self: Sized,
  {
    Shared(self)
  }
}

pub trait SharedEmitter: Emitter {
  fn emit<O>(self, subscriber: Subscriber<O, SharedSubscription>)
  where
    O: Observer<Item = Self::Item, Err = Self::Err> + Send + Sync + 'static;
}

observable_proxy_impl!(Shared, S);

impl<S> SharedObservable for Shared<S>
where
  S: SharedObservable,
{
  type Unsub = S::Unsub;
  #[inline]
  fn actual_subscribe<
    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
  >(
    self,
    subscriber: Subscriber<O, SharedSubscription>,
  ) -> Self::Unsub {
    self.0.actual_subscribe(subscriber)
  }
}