use crate::{
context::{Context, RcDeref, RcDerefMut},
observable::{CoreObservable, ObservableType},
observer::Observer,
subscription::{Subscription, TupleSubscription},
};
#[derive(Clone)]
pub struct Sample<S, N> {
pub source: S,
pub sampler: N,
}
impl<S, N> ObservableType for Sample<S, N>
where
S: ObservableType,
{
type Item<'a>
= S::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
pub struct SampleState<O, V> {
observer: Option<O>,
value: Option<V>,
}
impl<O, V> SampleState<O, V> {
pub fn new(observer: O) -> Self { Self { observer: Some(observer), value: None } }
#[inline]
pub fn store(&mut self, value: V) { self.value = Some(value); }
pub fn emit_if_present<Err>(&mut self)
where
O: Observer<V, Err>,
{
if let Some(value) = self.value.take()
&& let Some(observer) = self.observer.as_mut()
{
observer.next(value);
}
}
pub fn error<Err>(&mut self, err: Err)
where
O: Observer<V, Err>,
{
if let Some(observer) = self.observer.take() {
observer.error(err);
}
}
pub fn complete<Err>(&mut self)
where
O: Observer<V, Err>,
{
if let Some(observer) = self.observer.take() {
observer.complete();
}
}
pub fn is_closed<Err>(&self) -> bool
where
O: Observer<V, Err>,
{
self.observer.is_closed()
}
}
pub struct SampleSourceObserver<StateRc, NProxy> {
state: StateRc,
notifier_proxy: NProxy,
}
pub struct SampleSamplerObserver<StateRc> {
state: StateRc,
}
type SharedState<'a, C, S> =
<C as Context>::RcMut<SampleState<<C as Context>::Inner, <S as ObservableType>::Item<'a>>>;
type SampleSourceCtx<'a, C, S, U> = <C as Context>::With<
SampleSourceObserver<SharedState<'a, C, S>, <C as Context>::RcMut<Option<U>>>,
>;
type SampleSamplerCtx<'a, C, S> =
<C as Context>::With<SampleSamplerObserver<SharedState<'a, C, S>>>;
impl<S, N, C, SrcUnsub, NotifyUnsub> CoreObservable<C> for Sample<S, N>
where
C: Context,
S: for<'a> CoreObservable<SampleSourceCtx<'a, C, S, NotifyUnsub>, Unsub = SrcUnsub>,
N: for<'a> CoreObservable<SampleSamplerCtx<'a, C, S>, Unsub = NotifyUnsub>,
SrcUnsub: Subscription,
NotifyUnsub: Subscription,
C::RcMut<Option<NotifyUnsub>>: Subscription,
for<'a> C::Inner: Observer<S::Item<'a>, S::Err>,
{
type Unsub = TupleSubscription<SrcUnsub, C::RcMut<Option<NotifyUnsub>>>;
fn subscribe(self, context: C) -> Self::Unsub {
let Sample { source, sampler } = self;
let downstream = context.into_inner();
let state: SharedState<C, S> = C::RcMut::from(SampleState::new(downstream));
let notifier_proxy: C::RcMut<Option<N::Unsub>> = C::RcMut::from(None);
let source_observer =
SampleSourceObserver { state: state.clone(), notifier_proxy: notifier_proxy.clone() };
let source_ctx = C::lift(source_observer);
let source_unsub = source.subscribe(source_ctx);
let sampler_observer = SampleSamplerObserver { state: state.clone() };
let sampler_ctx = C::lift(sampler_observer);
let sampler_unsub = sampler.subscribe(sampler_ctx);
if !state.rc_deref().is_closed::<S::Err>() {
*notifier_proxy.rc_deref_mut() = Some(sampler_unsub);
} else {
sampler_unsub.unsubscribe();
}
TupleSubscription::new(source_unsub, notifier_proxy)
}
}
impl<Item, Err, O, StateRc, NProxy> Observer<Item, Err> for SampleSourceObserver<StateRc, NProxy>
where
StateRc: RcDerefMut<Target = SampleState<O, Item>>,
NProxy: Subscription,
O: Observer<Item, Err>,
{
fn next(&mut self, value: Item) { self.state.rc_deref_mut().store(value); }
fn error(self, err: Err) {
self.notifier_proxy.unsubscribe();
self.state.rc_deref_mut().error(err);
}
fn complete(self) {
self.notifier_proxy.unsubscribe();
self.state.rc_deref_mut().complete::<Err>();
}
fn is_closed(&self) -> bool { self.state.rc_deref().is_closed::<Err>() }
}
impl<Item, Err, SamplerItem, O, StateRc> Observer<SamplerItem, Err>
for SampleSamplerObserver<StateRc>
where
StateRc: RcDerefMut<Target = SampleState<O, Item>>,
O: Observer<Item, Err>,
{
fn next(&mut self, _: SamplerItem) { self.state.rc_deref_mut().emit_if_present::<Err>(); }
fn error(self, err: Err) { self.state.rc_deref_mut().error(err); }
fn complete(self) {
let mut state = self.state.rc_deref_mut();
state.emit_if_present::<Err>();
state.complete::<Err>();
}
fn is_closed(&self) -> bool { self.state.rc_deref().is_closed::<Err>() }
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, convert::Infallible, rc::Rc};
use crate::prelude::*;
#[rxrust_macro::test]
fn test_sample_emits_on_notifier() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let mut source = Local::subject::<i32, Infallible>();
let mut sampler = Local::subject::<(), Infallible>();
source
.clone()
.sample(sampler.clone())
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
source.next(1);
source.next(2);
sampler.next(()); source.next(3);
sampler.next(());
assert_eq!(*result.borrow(), vec![2, 3]);
}
#[rxrust_macro::test]
fn test_sample_no_value_when_sampler_emits() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let source = Local::subject::<i32, Infallible>();
let mut sampler = Local::subject::<(), Infallible>();
source
.clone()
.sample(sampler.clone())
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
sampler.next(());
sampler.next(());
assert!(result.borrow().is_empty());
}
#[rxrust_macro::test]
fn test_sample_emits_on_sampler_complete() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let mut source = Local::subject::<i32, Infallible>();
let sampler = Local::subject::<(), Infallible>();
source
.clone()
.sample(sampler.clone())
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
source.next(1);
source.next(2);
sampler.complete();
assert_eq!(*result.borrow(), vec![2]);
}
#[rxrust_macro::test]
fn test_sample_source_complete_completes_downstream() {
let completed = Rc::new(RefCell::new(false));
let completed_clone = completed.clone();
let mut source = Local::subject::<i32, Infallible>();
let sampler = Local::subject::<(), Infallible>();
source
.clone()
.sample(sampler.clone())
.on_complete(move || *completed_clone.borrow_mut() = true)
.subscribe(|_: i32| {});
source.next(1);
source.complete();
assert!(*completed.borrow());
}
#[rxrust_macro::test]
fn test_sample_each_sampler_takes_value() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let mut source = Local::subject::<i32, Infallible>();
let mut sampler = Local::subject::<(), Infallible>();
source
.clone()
.sample(sampler.clone())
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
source.next(1);
sampler.next(()); sampler.next(()); source.next(2);
sampler.next(());
assert_eq!(*result.borrow(), vec![1, 2]);
}
#[rxrust_macro::test]
fn test_sample_behavior_subject_source_and_sampler() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let mut source = Local::behavior_subject(1);
let mut sampler = Local::behavior_subject(());
source
.clone()
.sample(sampler.clone())
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(*result.borrow(), vec![1]);
source.next(2);
sampler.next(());
assert_eq!(*result.borrow(), vec![1, 2]);
}
}