use crate::prelude::*;
#[derive(Clone)]
pub struct Subscriber<O, U> {
pub(crate) observer: O,
pub(crate) subscription: U,
}
impl<O> Subscriber<O, LocalSubscription> {
pub fn local(observer: O) -> Self {
Subscriber {
observer,
subscription: LocalSubscription::default(),
}
}
}
impl<O> Subscriber<O, SharedSubscription> {
pub fn shared(observer: O) -> Self {
Subscriber {
observer,
subscription: SharedSubscription::default(),
}
}
}
impl<S, U> IntoShared for Subscriber<S, U>
where
S: IntoShared,
U: IntoShared,
{
type Shared = Subscriber<S::Shared, U::Shared>;
fn to_shared(self) -> Self::Shared {
Subscriber {
subscription: self.subscription.to_shared(),
observer: self.observer.to_shared(),
}
}
}
impl<Item, Err, S, U> Observer<Item, Err> for Subscriber<S, U>
where
S: Observer<Item, Err>,
U: SubscriptionLike,
{
fn next(&mut self, v: &Item) {
if !self.subscription.is_closed() {
self.observer.next(v)
}
}
fn complete(&mut self) {
if !self.subscription.is_closed() {
self.observer.complete();
self.subscription.unsubscribe()
}
}
fn error(&mut self, err: &Err) {
if !self.subscription.is_closed() {
self.observer.error(err);
self.subscription.unsubscribe();
}
}
}
impl<O, U> SubscriptionLike for Subscriber<O, U>
where
U: SubscriptionLike,
{
#[inline(always)]
fn unsubscribe(&mut self) { self.subscription.unsubscribe(); }
#[inline(always)]
fn is_closed(&self) -> bool { self.subscription.is_closed() }
#[inline(always)]
fn inner_addr(&self) -> *const () { self.subscription.inner_addr() }
}
#[cfg(test)]
mod test {
use crate::prelude::*;
use std::sync::{Arc, Mutex};
#[test]
fn shared_next_complete() {
let (next, _, complete, mut subscriber) = shared_subscriber_creator();
subscriber.next(&1);
subscriber.next(&2);
subscriber.complete();
subscriber.next(&3);
subscriber.next(&4);
assert_eq!(*next.lock().unwrap(), 2);
assert_eq!(*complete.lock().unwrap(), 1);
}
#[test]
fn shared_err_complete() {
let (next, error, _, mut subscriber) = shared_subscriber_creator();
subscriber.next(&1);
subscriber.next(&2);
subscriber.error(&());
subscriber.next(&3);
subscriber.next(&4);
assert_eq!(*next.lock().unwrap(), 2);
assert_eq!(*error.lock().unwrap(), 1);
}
type SubscriberInfo<O> =
(Arc<Mutex<i32>>, Arc<Mutex<i32>>, Arc<Mutex<i32>>, O);
fn shared_subscriber_creator() -> SubscriberInfo<impl Observer<i32, ()>> {
let next = Arc::new(Mutex::new(0));
let err = Arc::new(Mutex::new(0));
let complete = Arc::new(Mutex::new(0));
(
next.clone(),
err.clone(),
complete.clone(),
Subscriber::shared(SubscribeAll::new(
move |_: &_| *next.lock().unwrap() += 1,
move |_: &_| *err.lock().unwrap() += 1,
move || *complete.lock().unwrap() += 1,
))
.to_shared(),
)
}
#[test]
fn next_and_complete() {
let mut next = 0;
let mut complete = 0;
let mut subscriber = Subscriber::local(SubscribeComplete::new(
|_: &_| next += 1,
|| complete += 1,
));
subscriber.next(&1);
subscriber.next(&2);
subscriber.complete();
subscriber.next(&3);
subscriber.next(&4);
assert_eq!(next, 2);
assert_eq!(complete, 1);
}
#[test]
fn next_and_error() {
let mut next = 0;
let mut err = 0;
let mut subscriber = Subscriber::local(SubscribeErr::new(
|_: &_| next += 1,
|_: &()| err += 1,
));
subscriber.next(&1);
subscriber.next(&2);
subscriber.error(&());
subscriber.next(&3);
subscriber.next(&4);
assert_eq!(next, 2);
assert_eq!(err, 1);
}
}