use crate::prelude::*;
#[derive(Clone)]
pub struct Subscriber<O, U> {
pub(crate) observer: O,
pub(crate) subscription: U,
}
impl<O> Subscriber<O, LocalSubscription> {
pub fn local(observer: O) -> Self {
Subscriber {
observer,
subscription: LocalSubscription::default(),
}
}
}
impl<O> Subscriber<O, SharedSubscription> {
pub fn shared(observer: O) -> Self {
Subscriber {
observer,
subscription: SharedSubscription::default(),
}
}
}
impl<Item, Err, O, U> Observer for Subscriber<O, U>
where
O: Observer<Item = Item, Err = Err>,
U: SubscriptionLike,
{
type Item = Item;
type Err = Err;
fn next(&mut self, v: Item) {
if !self.is_finished() {
self.observer.next(v)
}
}
fn error(&mut self, err: Err) {
if !self.is_finished() {
self.observer.error(err);
}
}
fn complete(&mut self) {
if !self.is_finished() {
self.observer.complete();
}
}
#[inline]
fn is_stopped(&self) -> bool { self.observer.is_stopped() }
}
impl<O, U: SubscriptionLike> SubscriptionLike for Subscriber<O, U> {
#[inline]
fn is_closed(&self) -> bool { self.subscription.is_closed() }
#[inline]
fn unsubscribe(&mut self) { self.subscription.unsubscribe() }
}
#[cfg(test)]
mod test {
use crate::prelude::*;
use std::sync::{Arc, Mutex};
#[test]
fn shared_next_complete() {
let (next, _, complete, mut subscriber) = shared_subscriber_creator();
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
subscriber.next(3);
subscriber.next(4);
assert_eq!(*next.lock().unwrap(), 2);
assert_eq!(*complete.lock().unwrap(), 1);
}
#[test]
fn shared_err_complete() {
let (next, error, _, mut subscriber) = shared_subscriber_creator();
subscriber.next(1);
subscriber.next(2);
subscriber.error(());
subscriber.next(3);
subscriber.next(4);
assert_eq!(*next.lock().unwrap(), 2);
assert_eq!(*error.lock().unwrap(), 1);
}
type SubscriberInfo<O> =
(Arc<Mutex<i32>>, Arc<Mutex<i32>>, Arc<Mutex<i32>>, O);
fn shared_subscriber_creator()
-> SubscriberInfo<impl Observer<Item = i32, Err = ()>> {
let next = Arc::new(Mutex::new(0));
let err = Arc::new(Mutex::new(0));
let complete = Arc::new(Mutex::new(0));
(
next.clone(),
err.clone(),
complete.clone(),
Subscriber::shared(ObserverAll::new(
move |_| *next.lock().unwrap() += 1,
move |_| *err.lock().unwrap() += 1,
move || *complete.lock().unwrap() += 1,
)),
)
}
#[test]
fn next_and_complete() {
let mut next = 0;
let mut complete = 0;
let mut subscriber = Subscriber::local(ObserverAll::new(
|_: &_| next += 1,
|_: &i32| {},
|| complete += 1,
));
subscriber.next(&1);
subscriber.next(&2);
subscriber.complete();
subscriber.next(&3);
subscriber.next(&4);
assert_eq!(next, 2);
assert_eq!(complete, 1);
}
#[test]
fn next_and_error() {
let mut next = 0;
let mut err = 0;
let mut subscriber =
Subscriber::local(ObserverErr::new(|_: &_| next += 1, |_: &()| err += 1));
subscriber.next(&1);
subscriber.next(&2);
subscriber.error(&());
subscriber.next(&3);
subscriber.next(&4);
assert_eq!(next, 2);
assert_eq!(err, 1);
}
}