use crate::observer::{
complete_proxy_impl, error_proxy_impl, is_stopped_proxy_impl,
};
use crate::prelude::*;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
#[derive(Clone)]
pub struct FlattenOp<S, Inner> {
pub(crate) source: S,
pub(crate) marker: std::marker::PhantomData<Inner>,
}
impl<Outer, Inner, Item, Err> Observable for FlattenOp<Outer, Inner>
where
Outer: Observable<Item = Inner, Err = Err>,
Inner: Observable<Item = Item, Err = Err>,
{
type Item = Item;
type Err = Err;
}
pub struct FlattenState {
total: u64,
done: u64,
is_completed: bool,
}
impl FlattenState {
#[inline]
pub fn new() -> Self { Self::default() }
pub fn is_completed(&self) -> bool { self.is_completed }
pub fn register_new_observable(&mut self) {
if self.is_completed {
return;
}
self.total += 1;
}
pub fn register_observable_error(&mut self) -> bool {
if self.is_completed {
false
} else {
self.is_completed = true;
true
}
}
pub fn register_observable_completed(&mut self) -> bool {
if self.is_completed {
return false;
}
self.done += 1;
if self.total == self.done {
self.is_completed = true;
true
} else {
false
}
}
}
impl Default for FlattenState {
fn default() -> Self {
FlattenState {
total: 1,
done: 0,
is_completed: false,
}
}
}
#[derive(Clone)]
pub struct FlattenInnerObserver<O, S, St> {
observer: O,
subscription: S,
state: St,
}
impl<O, S, St, Item, Err> Observer for FlattenInnerObserver<O, S, St>
where
O: Observer<Item = Item, Err = Err>,
S: SubscriptionLike,
St: InnerDerefMut<Target = FlattenState>,
{
type Item = Item;
type Err = Err;
fn next(&mut self, item: Self::Item) {
let state = self.state.inner_deref();
let is_completed = state.is_completed;
drop(state);
if !is_completed {
self.observer.next(item);
}
}
fn error(&mut self, err: Self::Err) {
let mut state = self.state.inner_deref_mut();
let should_error = state.register_observable_error();
drop(state);
if should_error {
self.observer.error(err);
self.subscription.unsubscribe();
}
}
fn complete(&mut self) {
let mut state = self.state.inner_deref_mut();
let should_complete = state.register_observable_completed();
drop(state);
if should_complete {
self.observer.complete();
self.subscription.unsubscribe();
}
}
fn is_stopped(&self) -> bool {
let state = self.state.inner_deref();
state.is_completed()
}
}
type SharedInnerObserver<O> =
FlattenInnerObserver<O, SharedSubscription, Arc<Mutex<FlattenState>>>;
#[derive(Clone)]
pub struct FlattenSharedOuterObserver<Inner, O> {
marker: std::marker::PhantomData<Inner>,
inner_observer: Arc<Mutex<SharedInnerObserver<O>>>,
subscription: SharedSubscription,
state: Arc<Mutex<FlattenState>>,
}
impl<Inner, O, Item, Err> Observer for FlattenSharedOuterObserver<Inner, O>
where
O: Observer<Item = Item, Err = Err> + Sync + Send + 'static,
Inner: SharedObservable<Item = Item, Err = Err, Unsub = SharedSubscription>,
{
type Item = Inner;
type Err = Err;
fn next(&mut self, value: Inner) {
let mut state = self.state.lock().unwrap();
state.register_new_observable();
drop(state);
self.subscription.add(value.actual_subscribe(Subscriber {
observer: self.inner_observer.clone(),
subscription: SharedSubscription::default(),
}));
}
error_proxy_impl!(Err, inner_observer);
complete_proxy_impl!(inner_observer);
is_stopped_proxy_impl!(inner_observer);
}
impl<Outer, Inner, Item, Err> SharedObservable for FlattenOp<Outer, Inner>
where
Outer: SharedObservable<Item = Inner, Err = Err>,
Outer::Unsub: Send + Sync,
Inner: SharedObservable<Item = Item, Err = Err, Unsub = SharedSubscription>
+ Send
+ Sync
+ 'static,
{
type Unsub = SharedSubscription;
fn actual_subscribe<O>(
self,
subscriber: Subscriber<O, SharedSubscription>,
) -> Self::Unsub
where
O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
{
let state = Arc::new(Mutex::new(FlattenState::new()));
let subscription = subscriber.subscription;
let inner_observer = Arc::new(Mutex::new(FlattenInnerObserver {
observer: subscriber.observer,
subscription: subscription.clone(),
state: state.clone(),
}));
let observer = FlattenSharedOuterObserver {
marker: std::marker::PhantomData::<Inner>,
inner_observer,
subscription: subscription.clone(),
state,
};
subscription
.add(self.source.actual_subscribe(Subscriber::shared(observer)));
subscription
}
}
type LocalInnerObserver<O> =
FlattenInnerObserver<O, LocalSubscription, Rc<RefCell<FlattenState>>>;
#[derive(Clone)]
pub struct FlattenLocalOuterObserver<Inner, O> {
marker: std::marker::PhantomData<Inner>,
inner_observer: Rc<RefCell<LocalInnerObserver<O>>>,
subscription: LocalSubscription,
state: Rc<RefCell<FlattenState>>,
}
impl<'a, Inner, O, Item, Err> Observer for FlattenLocalOuterObserver<Inner, O>
where
O: Observer<Item = Item, Err = Err> + 'a,
Inner: LocalObservable<'a, Item = Item, Err = Err, Unsub = LocalSubscription>,
{
type Item = Inner;
type Err = Err;
fn next(&mut self, value: Inner) {
let mut state = self.state.borrow_mut();
state.register_new_observable();
drop(state);
self.subscription.add(
value.actual_subscribe(Subscriber::local(self.inner_observer.clone())),
);
}
error_proxy_impl!(Err, inner_observer);
complete_proxy_impl!(inner_observer);
is_stopped_proxy_impl!(inner_observer);
}
impl<'a, Outer, Inner, Item, Err> LocalObservable<'a>
for FlattenOp<Outer, Inner>
where
Outer: LocalObservable<'a, Item = Inner, Err = Err>,
Inner:
LocalObservable<'a, Item = Item, Err = Err, Unsub = LocalSubscription> + 'a,
{
type Unsub = LocalSubscription;
fn actual_subscribe<O>(
self,
subscriber: Subscriber<O, LocalSubscription>,
) -> Self::Unsub
where
O: Observer<Item = Self::Item, Err = Self::Err> + 'a,
{
let state = Rc::new(RefCell::new(FlattenState::new()));
let subscription = subscriber.subscription;
let inner_observer = Rc::new(RefCell::new(FlattenInnerObserver {
observer: subscriber.observer,
subscription: subscription.clone(),
state: state.clone(),
}));
let observer = FlattenLocalOuterObserver {
marker: std::marker::PhantomData::<Inner>,
inner_observer,
subscription: subscription.clone(),
state,
};
subscription.add(self.source.actual_subscribe(Subscriber::local(observer)));
subscription
}
}
#[cfg(test)]
mod test {
extern crate test;
use crate::prelude::*;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use test::Bencher;
#[test]
fn odd_even_flatten() {
let mut odd_store = vec![];
let mut even_store = vec![];
let mut numbers_store = vec![];
{
let mut sources = Subject::new();
let numbers = sources.clone().flatten();
let odd = numbers.clone().filter(|v: &i32| *v % 2 != 0);
let even = numbers.clone().filter(|v: &i32| *v % 2 == 0);
numbers.subscribe(|v: i32| numbers_store.push(v));
odd.subscribe(|v: i32| odd_store.push(v));
even.subscribe(|v: i32| even_store.push(v));
(0..10).for_each(|v| {
let source = observable::of(v);
sources.next(source);
});
}
assert_eq!(even_store, vec![0, 2, 4, 6, 8]);
assert_eq!(odd_store, vec![1, 3, 5, 7, 9]);
assert_eq!(numbers_store, (0..10).collect::<Vec<_>>());
}
#[test]
fn flatten_unsubscribe_work() {
let mut source = Subject::new();
let sources = source.clone().map(|v| observable::from_iter(vec![v]));
let numbers = sources.flatten();
let _even = numbers.clone().filter(|v| *v % 2 == 0);
let _odd = numbers.clone().filter(|v| *v % 2 != 0);
numbers
.subscribe(|_| unreachable!("oh, unsubscribe does not work."))
.unsubscribe();
source.next(&1);
}
#[test]
fn flatten_completed_test() {
let completed = Arc::new(AtomicBool::new(false));
let c_clone = completed.clone();
let mut source = Subject::new();
let mut one = Subject::new();
let mut two = Subject::new();
let out = source.clone().flatten();
out.subscribe_complete(
|_: &()| {},
move || {
println!("subscribe_complete complete callback done");
completed.store(true, Ordering::Relaxed);
},
);
source.next(one.clone());
source.next(two.clone());
one.complete();
assert!(!c_clone.load(Ordering::Relaxed));
two.complete();
assert!(!c_clone.load(Ordering::Relaxed));
source.complete();
assert!(c_clone.load(Ordering::Relaxed));
}
#[test]
fn flatten_error_test() {
let completed = Arc::new(Mutex::new(0));
let cc = completed.clone();
let error = Arc::new(Mutex::new(0));
let ec = error.clone();
let mut source = Subject::new();
let mut even = Subject::new();
let mut odd = Subject::new();
let output = source.clone().flatten();
output.subscribe_all(
|_: ()| {},
move |_| *error.lock().unwrap() += 1,
move || *completed.lock().unwrap() += 1,
);
source.next(even.clone());
source.next(odd.clone());
odd.error("");
even.error("");
even.complete();
assert_eq!(*cc.lock().unwrap(), 0);
assert_eq!(*ec.lock().unwrap(), 1);
}
#[test]
fn flatten_local_and_shared() {
let mut res = vec![];
let mut source = Subject::new();
let local1 = observable::of(1);
let local2 = observable::of(2);
let shared = source.clone().flatten().into_shared();
shared.subscribe(move |v: i32| {
res.push(v);
});
source.next(local1);
source.next(local2);
}
#[bench]
fn bench_flatten(b: &mut Bencher) { b.iter(odd_even_flatten); }
}