use crate::{
context::{Context, RcDeref},
observable::{CoreObservable, ObservableType},
observer::Observer,
subscription::{Subscription, TupleSubscription},
};
#[derive(Clone)]
pub struct TakeUntil<S, N> {
pub source: S,
pub notifier: N,
}
pub struct TakeUntilObserver<ObserverRc, NProxy> {
observer_rc: ObserverRc,
notifier_proxy: NProxy,
}
pub struct TakeUntilNotifierObserver<ObserverRc> {
observer_rc: ObserverRc,
complete_fn: fn(ObserverRc),
}
impl<ObserverRc: Clone> TakeUntilNotifierObserver<ObserverRc> {
pub fn new<Item, Err>(observer_rc: ObserverRc) -> Self
where
ObserverRc: Observer<Item, Err>,
{
Self { observer_rc, complete_fn: |o| o.complete() }
}
}
impl<S, N> ObservableType for TakeUntil<S, N>
where
S: ObservableType,
{
type Item<'a>
= S::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
impl<S, N, C> CoreObservable<C> for TakeUntil<S, N>
where
C: Context,
S: CoreObservable<
C::With<TakeUntilObserver<C::RcMut<Option<C::Inner>>, C::RcMut<Option<N::Unsub>>>>,
>,
N: CoreObservable<C::With<TakeUntilNotifierObserver<C::RcMut<Option<C::Inner>>>>>,
C::RcMut<Option<N::Unsub>>: Subscription,
for<'a> C::RcMut<Option<C::Inner>>: Observer<S::Item<'a>, S::Err>,
{
type Unsub = TupleSubscription<S::Unsub, C::RcMut<Option<N::Unsub>>>;
fn subscribe(self, context: C) -> Self::Unsub {
let TakeUntil { source, notifier } = self;
let downstream = context.into_inner();
let observer_rc = C::RcMut::from(Some(downstream));
let notifier_observer =
TakeUntilNotifierObserver::new::<S::Item<'_>, S::Err>(observer_rc.clone());
let notifier_ctx = C::lift(notifier_observer);
let notifier_unsub = notifier.subscribe(notifier_ctx);
let notifier_proxy: C::RcMut<Option<N::Unsub>> = C::RcMut::from(Some(notifier_unsub));
let source_observer = TakeUntilObserver { observer_rc, notifier_proxy: notifier_proxy.clone() };
let source_ctx = C::lift(source_observer);
let source_unsub = source.subscribe(source_ctx);
TupleSubscription::new(source_unsub, notifier_proxy)
}
}
impl<Item, Err, ObserverRc, NProxy> Observer<Item, Err> for TakeUntilObserver<ObserverRc, NProxy>
where
ObserverRc: Observer<Item, Err> + Clone,
NProxy: Subscription,
{
fn next(&mut self, value: Item) { self.observer_rc.clone().next(value); }
fn error(self, err: Err) {
self.observer_rc.clone().error(err);
self.notifier_proxy.unsubscribe();
}
fn complete(self) {
self.observer_rc.clone().complete();
self.notifier_proxy.unsubscribe();
}
fn is_closed(&self) -> bool { self.observer_rc.is_closed() }
}
impl<NotifyItem, NotifyErr, ObserverRc, O> Observer<NotifyItem, NotifyErr>
for TakeUntilNotifierObserver<ObserverRc>
where
ObserverRc: Clone + RcDeref<Target = Option<O>>,
{
fn next(&mut self, _value: NotifyItem) { (self.complete_fn)(self.observer_rc.clone()); }
fn error(self, _err: NotifyErr) {
}
fn complete(self) {
}
fn is_closed(&self) -> bool { self.observer_rc.rc_deref().is_none() }
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, convert::Infallible, rc::Rc};
use crate::prelude::*;
#[rxrust_macro::test]
fn test_take_until_emits_until_notifier_emits() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let mut notifier = Local::subject::<(), Infallible>();
let mut source = Local::subject::<i32, Infallible>();
source
.clone()
.take_until(notifier.clone())
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
source.next(1);
source.next(2);
notifier.next(());
source.next(3);
assert_eq!(*result.borrow(), vec![1, 2]);
}
#[rxrust_macro::test]
fn test_take_until_complete() {
let completed = Rc::new(RefCell::new(false));
let completed_clone = completed.clone();
let mut notifier = Local::subject::<(), Infallible>();
let mut source = Local::subject::<i32, Infallible>();
source
.clone()
.take_until(notifier.clone())
.on_complete(move || *completed_clone.borrow_mut() = true)
.subscribe(|_: i32| {});
source.next(1);
notifier.next(());
assert!(*completed.borrow());
}
#[rxrust_macro::test]
fn test_take_until_source_complete() {
let completed = Rc::new(RefCell::new(false));
let completed_clone = completed.clone();
let notifier = Local::subject::<(), Infallible>();
let mut source = Local::subject::<i32, Infallible>();
source
.clone()
.take_until(notifier.clone())
.on_complete(move || *completed_clone.borrow_mut() = true)
.subscribe(|_: i32| {});
source.next(1);
source.complete();
assert!(*completed.borrow());
}
#[rxrust_macro::test]
fn test_take_until_notifier_complete_does_nothing() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let notifier = Local::subject::<(), Infallible>();
let mut source = Local::subject::<i32, Infallible>();
source
.clone()
.take_until(notifier.clone())
.subscribe(move |v| result_clone.borrow_mut().push(v));
source.next(1);
notifier.complete(); source.next(2);
assert_eq!(*result.borrow(), vec![1, 2]);
}
}