use std::collections::VecDeque;
use crate::{
context::{Context, RcDerefMut},
observable::{CoreObservable, ObservableType},
observer::Observer,
subscription::{DynamicSubscriptions, IntoBoxedSubscription},
};
#[derive(Clone)]
pub struct MergeAll<S> {
pub source: S,
pub concurrent: usize,
}
#[doc(hidden)]
pub struct MergeAllState<O, InnerObs> {
observer: Option<O>,
pending_observables: VecDeque<InnerObs>,
subscribed: usize,
concurrent: usize,
outer_completed: bool,
}
impl<O, InnerObs> MergeAllState<O, InnerObs> {
fn observer_is_closed<Item, Err>(&self) -> bool
where
O: Observer<Item, Err>,
{
self.observer.as_ref().is_none_or(O::is_closed)
}
}
#[doc(hidden)]
#[derive(Clone)]
pub struct MergeAllOuterObserver<P, SubState> {
state: P,
sub_state: SubState,
}
#[doc(hidden)]
#[derive(Clone)]
pub struct MergeAllInnerObserver<P, SubState> {
state: P,
sub_state: SubState,
subscription_id: usize,
subscribe_fn: fn(P, SubState),
}
pub type MergeAllSubscription<SrcUnsub, SubState> =
crate::subscription::SourceWithDynamicSubs<SrcUnsub, SubState>;
impl<S> ObservableType for MergeAll<S>
where
S: ObservableType,
for<'a> S::Item<'a>: Context<Inner: ObservableType>,
{
type Item<'a>
= <<S::Item<'a> as Context>::Inner as ObservableType>::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
type RcSubscriptions<C> =
<C as Context>::RcMut<DynamicSubscriptions<<C as Context>::BoxedSubscription>>;
impl<S, C, InnerObs> CoreObservable<C> for MergeAll<S>
where
C: Context,
S: for<'a> CoreObservable<
C::With<
MergeAllOuterObserver<
C::RcMut<MergeAllState<C::Inner, InnerObs>>,
C::RcMut<DynamicSubscriptions<C::BoxedSubscription>>,
>,
>,
Item<'a>: Context<Inner = InnerObs>,
>,
InnerObs: ObservableType,
{
type Unsub = MergeAllSubscription<S::Unsub, RcSubscriptions<C>>;
fn subscribe(self, context: C) -> Self::Unsub {
let rc_sub_state: RcSubscriptions<C> = C::RcMut::from(DynamicSubscriptions::default());
let ctx = context.transform(|observer| {
let state = C::RcMut::from(MergeAllState {
observer: Some(observer),
pending_observables: VecDeque::new(),
subscribed: 0,
concurrent: self.concurrent,
outer_completed: false,
});
MergeAllOuterObserver { state, sub_state: rc_sub_state.clone() }
});
let src_unsub = self.source.subscribe(ctx);
crate::subscription::SourceWithDynamicSubs::new(src_unsub, rc_sub_state)
}
}
impl<Item, Err, O, InnerObs, RcState, RcUnsubs, DynUnsub> Observer<Item, Err>
for MergeAllInnerObserver<RcState, RcUnsubs>
where
O: Observer<Item, Err>,
RcState: RcDerefMut<Target = MergeAllState<O, InnerObs>> + Clone,
RcUnsubs: RcDerefMut<Target = DynamicSubscriptions<DynUnsub>> + Clone,
{
fn next(&mut self, value: Item) {
let mut state = self.state.rc_deref_mut();
if let Some(ref mut observer) = state.observer {
observer.next(value);
}
}
fn error(self, err: Err) {
let mut state = self.state.rc_deref_mut();
if let Some(observer) = state.observer.take() {
observer.error(err);
}
}
fn complete(self) {
self
.sub_state
.rc_deref_mut()
.remove(self.subscription_id);
let mut state = self.state.rc_deref_mut();
if !state.pending_observables.is_empty() {
drop(state);
(self.subscribe_fn)(self.state.clone(), self.sub_state.clone());
} else {
state.subscribed = state.subscribed.saturating_sub(1);
if state.subscribed == 0
&& state.outer_completed
&& let Some(observer) = state.observer.take()
{
observer.complete();
}
}
}
fn is_closed(&self) -> bool {
self
.state
.rc_deref()
.observer_is_closed::<Item, Err>()
}
}
impl<Ctx, Err, O, RcState, RcUnsubs> Observer<Ctx, Err> for MergeAllOuterObserver<RcState, RcUnsubs>
where
Ctx: Context<
Inner: CoreObservable<
Ctx::With<MergeAllInnerObserver<RcState, RcUnsubs>>,
Unsub: IntoBoxedSubscription<Ctx::BoxedSubscription>,
>,
>,
O: for<'a> Observer<<Ctx::Inner as ObservableType>::Item<'a>, Err>,
RcState: RcDerefMut<Target = MergeAllState<O, Ctx::Inner>> + Clone,
RcUnsubs: RcDerefMut<Target = DynamicSubscriptions<Ctx::BoxedSubscription>> + Clone,
{
fn next(&mut self, inner_ctx_obs: Ctx) {
let inner_obs = inner_ctx_obs.into_inner();
let mut state = self.state.rc_deref_mut();
state.pending_observables.push_back(inner_obs);
if state.subscribed < state.concurrent {
state.subscribed += 1;
drop(state);
Self::do_next_subscribe::<Ctx, _, _>(self.state.clone(), self.sub_state.clone());
}
}
fn error(self, err: Err) {
let mut state = self.state.rc_deref_mut();
if let Some(observer) = state.observer.take() {
observer.error(err);
}
}
fn complete(self) {
let mut state = self.state.rc_deref_mut();
state.outer_completed = true;
if state.subscribed == 0
&& state.pending_observables.is_empty()
&& let Some(observer) = state.observer.take()
{
observer.complete();
}
}
fn is_closed(&self) -> bool { self.state.rc_deref().observer_is_closed() }
}
impl<RcState, RcUnsubs> MergeAllOuterObserver<RcState, RcUnsubs> {
fn do_next_subscribe<Ctx, O, InnerObs>(state: RcState, sub_state: RcUnsubs)
where
Ctx: Context,
RcState: RcDerefMut<Target = MergeAllState<O, InnerObs>> + Clone,
RcUnsubs: RcDerefMut<Target = DynamicSubscriptions<Ctx::BoxedSubscription>> + Clone,
InnerObs: CoreObservable<
Ctx::With<MergeAllInnerObserver<RcState, RcUnsubs>>,
Unsub: IntoBoxedSubscription<Ctx::BoxedSubscription>,
>,
{
let inner_obs = state
.rc_deref_mut()
.pending_observables
.pop_front();
if let Some(inner_obs) = inner_obs {
let subscription_id = sub_state.rc_deref_mut().reserve_id();
let inner_observer = MergeAllInnerObserver {
state: state.clone(),
sub_state: sub_state.clone(),
subscription_id,
subscribe_fn: Self::do_next_subscribe::<Ctx, _, _>,
};
let unsub = inner_obs.subscribe(Ctx::lift(inner_observer));
let boxed_unsub = unsub.into_boxed();
sub_state
.rc_deref_mut()
.insert(subscription_id, boxed_unsub);
}
}
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, rc::Rc};
use crate::prelude::*;
#[rxrust_macro::test]
fn test_merge_all_basic() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([Local::from_iter([1, 2]), Local::from_iter([3, 4])])
.merge_all(usize::MAX)
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(*result.borrow(), vec![1, 2, 3, 4]);
}
#[rxrust_macro::test]
fn test_concat_all() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([Local::from_iter([1, 2]), Local::from_iter([3, 4])])
.concat_all()
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(*result.borrow(), vec![1, 2, 3, 4]);
}
#[rxrust_macro::test]
fn test_merge_all_empty() {
let result = Rc::new(RefCell::new(Vec::<i32>::new()));
let result_clone = result.clone();
let completed = Rc::new(RefCell::new(false));
let completed_clone = completed.clone();
Local::from_iter(Vec::<Local<FromIter<std::vec::IntoIter<i32>>>>::new())
.merge_all(usize::MAX)
.on_complete(move || {
*completed_clone.borrow_mut() = true;
})
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert!(result.borrow().is_empty());
assert!(*completed.borrow());
}
#[rxrust_macro::test(local)]
async fn test_merge_all_subscription_stays_open_while_inner_active() {
use std::time::Duration;
use crate::{scheduler::LocalScheduler, subscription::Subscription};
let values = Rc::new(RefCell::new(Vec::new()));
let values_c = values.clone();
let completed = Rc::new(RefCell::new(false));
let completed_c = completed.clone();
let subscription = Local::of(Local::of(42).delay_subscription(Duration::from_millis(20)))
.merge_all(usize::MAX)
.on_complete(move || *completed_c.borrow_mut() = true)
.subscribe(move |v| values_c.borrow_mut().push(v));
assert!(!subscription.is_closed());
assert!(values.borrow().is_empty());
assert!(!*completed.borrow());
LocalScheduler
.sleep(Duration::from_millis(40))
.await;
assert_eq!(*values.borrow(), vec![42]);
assert!(*completed.borrow());
assert!(subscription.is_closed());
}
#[rxrust_macro::test]
fn test_merge_all_concurrency() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let mut s1 = Local::subject();
let mut s2 = Local::subject();
let mut s3 = Local::subject();
let mut outer = Local::subject();
let _subscription = outer.clone().merge_all(2).subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
outer.next(s1.clone());
outer.next(s2.clone());
outer.next(s3.clone());
s1.next(1);
s2.next(2);
s3.next(3);
assert_eq!(*result.borrow(), vec![1, 2]);
s1.complete();
s3.next(4);
assert_eq!(*result.borrow(), vec![1, 2, 4]);
}
#[rxrust_macro::test]
fn test_merge_all_inner_error() {
let error_called = Rc::new(RefCell::new(false));
let error_called_clone = error_called.clone();
let s1 = Local::subject();
let mut outer = Local::subject();
outer
.clone()
.merge_all(usize::MAX)
.on_error(move |_| {
*error_called_clone.borrow_mut() = true;
})
.subscribe(|_: i32| {});
outer.next(s1.clone());
s1.error(());
assert!(*error_called.borrow());
}
#[rxrust_macro::test]
fn test_merge_all_outer_error() {
let error_called = Rc::new(RefCell::new(false));
let error_called_clone = error_called.clone();
let outer = Local::subject();
let inner_dummy = Local::subject::<i32, ()>();
let mut outer_clone = outer.clone();
outer
.clone()
.merge_all(usize::MAX)
.on_error(move |_| {
*error_called_clone.borrow_mut() = true;
})
.subscribe(|_| {});
if false {
outer_clone.next(inner_dummy.clone());
}
outer.error(());
assert!(*error_called.borrow());
}
#[rxrust_macro::test]
fn test_merge_all_unsubscribe() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let mut s1 = Local::subject();
let mut s2 = Local::subject();
let mut outer = Local::subject();
let subscription = outer
.clone()
.merge_all(usize::MAX)
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
outer.next(s1.clone());
outer.next(s2.clone());
s1.next(1);
s2.next(2);
assert_eq!(*result.borrow(), vec![1, 2]);
subscription.unsubscribe();
s1.next(3);
s2.next(4);
assert_eq!(*result.borrow(), vec![1, 2]);
}
}