use crate::{
context::{Context, RcDeref, RcDerefMut, Scope},
observable::{CoreObservable, ObservableType},
observer::Observer,
subscription::{IntoBoxedSubscription, Subscription, TupleSubscription},
};
#[derive(Clone)]
pub struct SwitchMap<S, F> {
pub source: S,
pub func: F,
}
#[doc(hidden)]
pub struct SwitchMapState<O, InnerSub> {
observer: O,
outer_completed: bool,
inner_sub: Option<InnerSub>,
}
impl<O, InnerSub> Subscription for SwitchMapState<O, InnerSub>
where
InnerSub: Subscription,
{
fn unsubscribe(mut self) {
if let Some(inner) = self.inner_sub.take() {
inner.unsubscribe();
}
}
fn is_closed(&self) -> bool { false }
}
#[doc(hidden)]
#[derive(Clone)]
pub struct SwitchMapOuterObserver<Sc: Scope, O, F> {
state: SwitchState<Sc, O>,
func: F,
}
#[doc(hidden)]
#[derive(Clone)]
pub struct SwitchMapInnerObserver<State>(State);
pub type SwitchMapSubscription<SrcSub, InnerSub> = TupleSubscription<SrcSub, InnerSub>;
impl<S, F, Out> ObservableType for SwitchMap<S, F>
where
S: ObservableType,
F: for<'a> FnMut(S::Item<'a>) -> Out,
Out: Context<Inner: ObservableType<Err = S::Err> + 'static>,
{
type Item<'a>
= <Out::Inner as ObservableType>::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
type SwitchState<Sc, O> =
<Sc as Scope>::RcMut<Option<SwitchMapState<O, <Sc as Scope>::BoxedSubscription>>>;
type InnerObserverCtx<C> = <C as Context>::With<
SwitchMapInnerObserver<SwitchState<<C as Context>::Scope, <C as Context>::Inner>>,
>;
impl<S, F, C, Out, InnerObs> CoreObservable<C> for SwitchMap<S, F>
where
C: Context,
S: CoreObservable<C::With<SwitchMapOuterObserver<C::Scope, C::Inner, F>>>,
F: for<'a> FnMut(S::Item<'a>) -> Out,
Out: Context<Inner = InnerObs>,
InnerObs: CoreObservable<InnerObserverCtx<C>, Err = S::Err> + 'static,
InnerObs::Unsub: IntoBoxedSubscription<C::BoxedSubscription>,
SwitchState<C::Scope, C::Inner>: Subscription,
{
type Unsub = SwitchMapSubscription<S::Unsub, SwitchState<C::Scope, C::Inner>>;
fn subscribe(self, context: C) -> Self::Unsub {
let SwitchMap { source, func } = self;
let state: SwitchState<C::Scope, C::Inner> = <C::Scope as Scope>::RcMut::from(None);
let wrapped = context.transform(|observer| {
*state.rc_deref_mut() =
Some(SwitchMapState { observer, outer_completed: false, inner_sub: None });
SwitchMapOuterObserver { state: state.clone(), func }
});
let source_unsub = source.subscribe(wrapped);
TupleSubscription::new(source_unsub, state)
}
}
impl<Sc, O, InnerObs, Item, Err, F, Out> Observer<Item, Err> for SwitchMapOuterObserver<Sc, O, F>
where
Sc: Scope,
O: for<'a> Observer<InnerObs::Item<'a>, Err>,
F: for<'a> FnMut(Item) -> Out,
Out: Context<Inner = InnerObs, Scope = Sc>,
InnerObs: CoreObservable<
Out::With<SwitchMapInnerObserver<SwitchState<Sc, O>>>,
Unsub: IntoBoxedSubscription<Sc::BoxedSubscription>,
>,
{
fn next(&mut self, value: Item) {
if self.is_closed() {
return;
}
{
let mut guard = self.state.rc_deref_mut();
if let Some(mut st) = guard.take() {
if let Some(prev) = st.inner_sub.take() {
prev.unsubscribe();
}
*guard = Some(st);
}
}
let inner_ctx = (self.func)(value);
let inner_obs = inner_ctx.into_inner();
let inner_observer = SwitchMapInnerObserver(self.state.clone());
let inner_unsub = inner_obs.subscribe(Out::lift(inner_observer));
if let Some(st) = self.state.rc_deref_mut().as_mut() {
st.inner_sub = Some(inner_unsub.into_boxed());
}
}
fn error(self, err: Err) {
if let Some(mut st) = self.state.rc_deref_mut().take() {
st.observer.error(err);
if let Some(inner) = st.inner_sub.take() {
inner.unsubscribe();
}
}
}
fn complete(self) {
let mut guard = self.state.rc_deref_mut();
let Some(st) = guard.as_mut() else { return };
st.outer_completed = true;
if st.inner_sub.is_none() {
let st = guard.take().unwrap();
st.observer.complete();
}
}
fn is_closed(&self) -> bool {
self
.state
.rc_deref()
.as_ref()
.is_none_or(|st| O::is_closed(&st.observer))
}
}
impl<Item, Err, O, State, InnerSubState> Observer<Item, Err> for SwitchMapInnerObserver<State>
where
O: Observer<Item, Err>,
State: RcDerefMut<Target = Option<SwitchMapState<O, InnerSubState>>> + Clone,
{
fn next(&mut self, value: Item) {
if let Some(st) = self.0.rc_deref_mut().as_mut() {
st.observer.next(value);
}
}
fn error(self, err: Err) {
if let Some(mut st) = self.0.rc_deref_mut().take() {
let _ = st.inner_sub.take();
st.observer.error(err);
}
}
fn complete(self) {
let mut guard = self.0.rc_deref_mut();
let Some(st) = guard.as_mut() else { return };
let _ = st.inner_sub.take();
if st.outer_completed {
let st = guard.take().unwrap();
st.observer.complete();
}
}
fn is_closed(&self) -> bool {
self
.0
.rc_deref()
.as_ref()
.is_none_or(|st| O::is_closed(&st.observer))
}
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, convert::Infallible, rc::Rc};
use crate::prelude::*;
#[rxrust_macro::test(local)]
async fn test_switch_map_only_latest_inner_emits() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let mut outer = Local::subject::<i32, Infallible>();
let mut inner1 = Local::subject::<&'static str, Infallible>();
let mut inner2 = Local::subject::<&'static str, Infallible>();
let inner1_for_map = inner1.clone();
let inner2_for_map = inner2.clone();
let _subscription = outer
.clone()
.switch_map(move |x| if x == 1 { inner1_for_map.clone() } else { inner2_for_map.clone() })
.subscribe(move |v| result_clone.borrow_mut().push(v));
outer.next(1);
inner1.next("a");
outer.next(2); inner1.next("b"); inner2.next("c");
assert_eq!(*result.borrow(), vec!["a", "c"]);
}
#[rxrust_macro::test(local)]
async fn test_switch_map_completion_waits_for_inner() {
let completed = Rc::new(RefCell::new(false));
let completed_clone = completed.clone();
let mut outer = Local::subject::<i32, Infallible>();
let inner = Local::subject::<i32, Infallible>();
let inner_for_map = inner.clone();
let _subscription = outer
.clone()
.switch_map(move |_| inner_for_map.clone())
.on_complete(move || {
*completed_clone.borrow_mut() = true;
})
.subscribe(|_| {});
outer.next(1);
outer.complete();
assert!(!*completed.borrow());
inner.complete();
assert!(*completed.borrow());
}
#[rxrust_macro::test(local)]
async fn test_switch_map_inner_error_errors_downstream() {
let got_error = Rc::new(RefCell::new(false));
let got_error_clone = got_error.clone();
let mut outer = Local::subject::<(), &'static str>();
let _subscription = outer
.clone()
.switch_map(|_| Local::throw_err("boom"))
.on_error(move |_e| {
*got_error_clone.borrow_mut() = true;
})
.subscribe(|_| {});
outer.next(());
assert!(*got_error.borrow());
}
}