use super::{
subject_subscription::{RemoveState, SubjectSubscription, SubscriptionState},
subscribers::Subscribers,
};
use crate::{
context::{Context, MutArc, MutRc, RcDeref, RcDerefMut, SharedCell},
observable::{CoreObservable, ObservableType},
observer::{
BoxedObserver, BoxedObserverMutRef, BoxedObserverMutRefSend, BoxedObserverSend,
IntoBoxedObserver, Observer,
},
scheduler::{Scheduler, Task, TaskState},
};
pub type SubjectPtr<'a, C, Item, Err> =
<C as Context>::RcMut<Subscribers<<C as Context>::BoxedObserver<'a, Item, Err>>>;
pub type SubjectPtrMutRef<'a, C, Item, Err> =
<C as Context>::RcMut<Subscribers<<C as Context>::BoxedObserverMutRef<'a, Item, Err>>>;
pub type InnerSubject<'a, Item, Err> = Subject<MutRc<Subscribers<BoxedObserver<'a, Item, Err>>>>;
pub type InnerSubjectSend<'a, Item, Err> =
Subject<MutArc<Subscribers<BoxedObserverSend<'a, Item, Err>>>>;
pub type InnerSubjectMutRef<'a, Item, Err> =
Subject<MutRc<Subscribers<BoxedObserverMutRef<'a, Item, Err>>>>;
pub type InnerSubjectMutRefSend<'a, Item, Err> =
Subject<MutArc<Subscribers<BoxedObserverMutRefSend<'a, Item, Err>>>>;
#[cfg(feature = "scheduler")]
pub type LocalSubject<'a, Item, Err> = Local<InnerSubject<'a, Item, Err>>;
#[cfg(feature = "scheduler")]
pub type SharedSubject<'a, Item, Err> = Shared<InnerSubjectSend<'a, Item, Err>>;
#[cfg(feature = "scheduler")]
pub type LocalSubjectMutRef<'a, Item, Err> = Local<InnerSubjectMutRef<'a, Item, Err>>;
#[cfg(feature = "scheduler")]
pub type SharedSubjectMutRef<'a, Item, Err> = Shared<InnerSubjectMutRefSend<'a, Item, Err>>;
pub struct Subject<P> {
pub observers: P,
}
#[cfg(feature = "scheduler")]
use crate::context::{Local, Shared};
#[cfg(feature = "scheduler")]
use crate::scheduler::{LocalScheduler, SharedScheduler};
#[cfg(feature = "scheduler")]
impl<'a, Item, Err> Subject<MutRc<Subscribers<BoxedObserver<'a, Item, Err>>>> {
pub fn local() -> Local<Self> { Local { inner: Self::default(), scheduler: LocalScheduler } }
}
#[cfg(feature = "scheduler")]
impl<'a, Item, Err> Subject<MutArc<Subscribers<BoxedObserverSend<'a, Item, Err>>>> {
pub fn shared() -> Shared<Self> { Shared { inner: Self::default(), scheduler: SharedScheduler } }
}
#[cfg(feature = "scheduler")]
impl<'a, Item: 'a, Err> Subject<MutRc<Subscribers<BoxedObserverMutRef<'a, Item, Err>>>> {
pub fn local_mut_ref() -> Local<Self> {
Local { inner: Self::default(), scheduler: LocalScheduler }
}
}
#[cfg(feature = "scheduler")]
impl<'a, Item: 'a, Err> Subject<MutArc<Subscribers<BoxedObserverMutRefSend<'a, Item, Err>>>> {
pub fn shared_mut_ref() -> Shared<Self> {
Shared { inner: Self::default(), scheduler: SharedScheduler }
}
}
impl<P, O> Subject<P>
where
P: RcDeref<Target = Subscribers<O>>,
{
pub fn subscriber_count(&self) -> usize { self.observers.rc_deref().inner.len() }
pub fn is_empty(&self) -> bool { self.observers.rc_deref().inner.is_empty() }
}
impl<P: Clone> Clone for Subject<P> {
fn clone(&self) -> Self { Self { observers: self.observers.clone() } }
}
impl<P> Default for Subject<P>
where
P: RcDeref<Target: Default> + From<P::Target>,
{
fn default() -> Self { Self { observers: P::from(P::Target::default()) } }
}
macro_rules! impl_observer_for_subject {
(value, $ptr:ident, $obs:ident) => {
#[allow(coherence_leak_check)]
impl<'a, Item, Err> Observer<Item, Err> for Subject<$ptr<Subscribers<$obs<'a, Item, Err>>>>
where
Item: Clone,
Err: Clone,
{
fn next(&mut self, value: Item) {
if let Some(mut guard) = self.observers.try_rc_deref_mut() {
guard.broadcast_value(value);
return;
}
panic!(
"re-entrant Subject emissions are not supported (next/error/complete). Use an explicit \
async boundary (e.g. delay(0)) if you need feedback loops."
);
}
fn error(self, err: Err) {
if let Some(mut guard) = self.observers.try_rc_deref_mut() {
guard.broadcast_error(err);
return;
}
panic!(
"re-entrant Subject emissions are not supported (next/error/complete). Use an explicit \
async boundary (e.g. delay(0)) if you need feedback loops."
);
}
fn complete(self) {
if let Some(mut guard) = self.observers.try_rc_deref_mut() {
guard.broadcast_complete();
return;
}
panic!(
"re-entrant Subject emissions are not supported (next/error/complete). Use an explicit \
async boundary (e.g. delay(0)) if you need feedback loops."
);
}
fn is_closed(&self) -> bool { self.observers.rc_deref().inner.is_empty() }
}
};
(mut_ref, $ptr:ident, $obs:ident) => {
#[allow(coherence_leak_check)]
impl<'a, Item, Err> Observer<&mut Item, Err> for Subject<$ptr<Subscribers<$obs<'a, Item, Err>>>>
where
Err: Clone,
{
fn next(&mut self, value: &mut Item) {
if let Some(mut guard) = self.observers.try_rc_deref_mut() {
guard.broadcast_mut_ref(value);
return;
}
panic!(
"re-entrant Subject emissions are not supported (next/error/complete). Use an explicit \
async boundary (e.g. delay(0)) if you need feedback loops."
);
}
fn error(self, err: Err) {
if let Some(mut guard) = self.observers.try_rc_deref_mut() {
guard.broadcast_error(err);
return;
}
panic!(
"re-entrant Subject emissions are not supported (next/error/complete). Use an explicit \
async boundary (e.g. delay(0)) if you need feedback loops."
);
}
fn complete(self) {
if let Some(mut guard) = self.observers.try_rc_deref_mut() {
guard.broadcast_complete();
return;
}
panic!(
"re-entrant Subject emissions are not supported (next/error/complete). Use an explicit \
async boundary (e.g. delay(0)) if you need feedback loops."
);
}
fn is_closed(&self) -> bool { self.observers.rc_deref().inner.is_empty() }
}
};
}
impl_observer_for_subject!(value, MutRc, BoxedObserver);
impl_observer_for_subject!(value, MutArc, BoxedObserverSend);
impl_observer_for_subject!(mut_ref, MutRc, BoxedObserverMutRef);
impl_observer_for_subject!(mut_ref, MutArc, BoxedObserverMutRefSend);
macro_rules! impl_core_observable_for_subject {
($ptr:ident, $obs:ident, $item_type:ty) => {
#[allow(coherence_leak_check)]
impl<'a, Item, Err> ObservableType for Subject<$ptr<Subscribers<$obs<'a, Item, Err>>>> {
type Item<'m>
= $item_type
where
Self: 'm;
type Err = Err;
}
#[allow(coherence_leak_check)]
impl<'a, Item, Err, C> CoreObservable<C> for Subject<$ptr<Subscribers<$obs<'a, Item, Err>>>>
where
C: Context,
C::Inner: IntoBoxedObserver<$obs<'a, Item, Err>>,
C::Scheduler: Scheduler<
Task<
AddState<
$ptr<Subscribers<$obs<'a, Item, Err>>>,
$obs<'a, Item, Err>,
C::RcCell<SubscriptionState>,
>,
>,
>,
C::Scheduler: Scheduler<Task<RemoveState<$ptr<Subscribers<$obs<'a, Item, Err>>>>>>,
{
type Unsub = SubjectSubscription<
$ptr<Subscribers<$obs<'a, Item, Err>>>,
C::Scheduler,
C::RcCell<SubscriptionState>,
>;
fn subscribe(self, observer: C) -> Self::Unsub {
let scheduler = observer.scheduler().clone();
let boxed = observer.into_inner().into_boxed();
let observers_ptr = self.observers.clone();
let state = C::RcCell::<SubscriptionState>::from(SubscriptionState::Pending);
if let Some(mut guard) = self.observers.try_rc_deref_mut() {
let id = guard.add(boxed);
state.set(SubscriptionState::Ready(id));
return SubjectSubscription::new(observers_ptr, state, scheduler);
}
let sub = SubjectSubscription::new(observers_ptr, state.clone(), scheduler.clone());
let observers = self.observers;
let task = Task::new(AddState { observers, boxed: Some(boxed), state }, |task_state| {
let current_state = task_state.state.get();
if current_state == SubscriptionState::Cancelled {
return TaskState::Finished;
}
let boxed = task_state
.boxed
.take()
.expect("add executed twice");
let mut guard = task_state.observers.rc_deref_mut();
let id = guard.add(boxed);
if task_state
.state
.compare_exchange(SubscriptionState::Pending, SubscriptionState::Ready(id))
.is_err()
{
let _ = guard.remove(id);
}
TaskState::Finished
});
let _handle = scheduler.schedule(task, None);
sub
}
}
};
}
struct AddState<P, Ob, Cell> {
observers: P,
boxed: Option<Ob>,
state: Cell,
}
impl_core_observable_for_subject!(MutRc, BoxedObserver, Item);
impl_core_observable_for_subject!(MutArc, BoxedObserverSend, Item);
impl_core_observable_for_subject!(MutRc, BoxedObserverMutRef, &'m mut Item);
impl_core_observable_for_subject!(MutArc, BoxedObserverMutRefSend, &'m mut Item);
#[cfg(test)]
mod tests {
use std::{
cell::RefCell,
convert::Infallible,
rc::Rc,
sync::{Arc, Mutex},
};
use super::*;
use crate::{observable::connectable::Connectable, prelude::*};
#[rxrust_macro::test]
fn test_local_subject() {
let subject = Local::subject();
let results = Rc::new(RefCell::new(vec![]));
let c_results = results.clone();
subject.clone().subscribe(move |v| {
c_results.borrow_mut().push(v);
});
subject.clone().next(1);
subject.clone().next(2);
assert_eq!(*results.borrow(), vec![1, 2]);
}
#[rxrust_macro::test]
fn test_local_subject_multiple_subscribers() {
let subject = Local::subject();
let results1 = Rc::new(RefCell::new(vec![]));
let results2 = Rc::new(RefCell::new(vec![]));
let c1 = results1.clone();
subject
.clone()
.subscribe(move |v| c1.borrow_mut().push(v));
subject.clone().next(1);
let c2 = results2.clone();
subject
.clone()
.subscribe(move |v| c2.borrow_mut().push(v));
subject.clone().next(2);
assert_eq!(*results1.borrow(), vec![1, 2]);
assert_eq!(*results2.borrow(), vec![2]);
}
#[rxrust_macro::test]
fn test_shared_subject() {
let subject = Shared::subject();
let results = Arc::new(Mutex::new(vec![]));
let c_results = results.clone();
subject.clone().subscribe(move |v| {
c_results.lock().unwrap().push(v);
});
let mut s = subject.clone();
s.next(1);
s.next(2);
assert_eq!(*results.lock().unwrap(), vec![1, 2]);
}
#[rxrust_macro::test]
fn test_unsubscribe() {
use crate::subscription::Subscription;
let subject = Local::subject();
let results = Rc::new(RefCell::new(vec![]));
let c_results = results.clone();
let sub = subject.clone().subscribe(move |v| {
c_results.borrow_mut().push(v);
});
subject.clone().next(1);
sub.unsubscribe();
subject.clone().next(2);
assert_eq!(*results.borrow(), vec![1]);
}
#[cfg(all(feature = "scheduler", not(target_arch = "wasm32")))]
#[rxrust_macro::test(local)]
async fn test_subject_unsubscribe_inside_next_is_deferred() {
use crate::subscription::Subscription;
let subject = Local::subject::<i32, Infallible>();
let results_primary = Rc::new(RefCell::new(vec![]));
let results_secondary = Rc::new(RefCell::new(vec![]));
let sub_secondary = subject.clone().subscribe({
let results_secondary = results_secondary.clone();
move |v| results_secondary.borrow_mut().push(v)
});
let sub_secondary_cell = Rc::new(RefCell::new(Some(sub_secondary)));
subject.clone().subscribe({
let _subject = subject.clone();
let results_primary = results_primary.clone();
let sub_secondary_cell = sub_secondary_cell.clone();
move |v| {
results_primary.borrow_mut().push(v);
if v == 1
&& let Some(sub) = sub_secondary_cell.borrow_mut().take()
{
sub.unsubscribe();
}
}
});
subject.clone().next(1);
crate::scheduler::tokio::task::yield_now().await;
crate::scheduler::tokio::task::yield_now().await;
assert_eq!(*results_primary.borrow(), vec![1]);
assert_eq!(*results_secondary.borrow(), vec![1]);
subject.clone().next(2);
crate::scheduler::tokio::task::yield_now().await;
assert_eq!(*results_primary.borrow(), vec![1, 2]);
assert_eq!(*results_secondary.borrow(), vec![1]);
}
#[cfg(all(feature = "scheduler", not(target_arch = "wasm32")))]
#[rxrust_macro::test(local)]
async fn test_subject_subscribe_inside_next_is_deferred() {
use crate::subscription::Subscription;
let subject = Local::subject::<i32, Infallible>();
let primary = Rc::new(RefCell::new(vec![]));
let secondary = Rc::new(RefCell::new(vec![]));
let secondary_sub: Rc<RefCell<Option<_>>> = Rc::new(RefCell::new(None));
subject.clone().subscribe({
let subject = subject.clone();
let primary = primary.clone();
let secondary = secondary.clone();
let secondary_sub = secondary_sub.clone();
move |v| {
primary.borrow_mut().push(v);
if v == 1 {
let sub = subject.clone().subscribe({
let secondary = secondary.clone();
move |v| secondary.borrow_mut().push(v)
});
*secondary_sub.borrow_mut() = Some(sub);
}
}
});
subject.clone().next(1);
crate::scheduler::tokio::task::yield_now().await;
crate::scheduler::tokio::task::yield_now().await;
assert_eq!(*primary.borrow(), vec![1]);
assert_eq!(*secondary.borrow(), Vec::<i32>::new());
subject.clone().next(2);
crate::scheduler::tokio::task::yield_now().await;
assert_eq!(*primary.borrow(), vec![1, 2]);
assert_eq!(*secondary.borrow(), vec![2]);
if let Some(sub) = secondary_sub.borrow_mut().take() {
sub.unsubscribe();
}
}
#[cfg(all(feature = "scheduler", not(target_arch = "wasm32")))]
#[rxrust_macro::test(local)]
async fn test_subject_unsubscribe_cancels_pending_subscribe() {
let subject = Local::subject::<i32, Infallible>();
let secondary = Rc::new(RefCell::new(vec![]));
subject.clone().subscribe({
let subject = subject.clone();
let secondary = secondary.clone();
move |v| {
if v == 1 {
let sub = subject.clone().subscribe({
let secondary = secondary.clone();
move |v| secondary.borrow_mut().push(v)
});
sub.unsubscribe();
}
}
});
subject.clone().next(1);
crate::scheduler::tokio::task::yield_now().await;
crate::scheduler::tokio::task::yield_now().await;
subject.clone().next(2);
crate::scheduler::tokio::task::yield_now().await;
assert!(secondary.borrow().is_empty());
}
#[cfg(not(target_arch = "wasm32"))]
#[rxrust_macro::test]
fn test_subject_reentrant_next_panics() {
use std::panic::{AssertUnwindSafe, catch_unwind};
let subject = Local::subject::<i32, Infallible>();
subject.clone().subscribe({
let subject = subject.clone();
move |_| {
subject.clone().next(2);
}
});
let result = catch_unwind(AssertUnwindSafe(|| {
subject.clone().next(1);
}));
assert!(result.is_err());
}
#[cfg(not(target_arch = "wasm32"))]
#[rxrust_macro::test]
fn test_subject_reentrant_complete_panics() {
use std::panic::{AssertUnwindSafe, catch_unwind};
let subject = Local::subject::<i32, Infallible>();
subject.clone().subscribe({
let subject = subject.clone();
move |_| {
subject.clone().complete();
}
});
let result = catch_unwind(AssertUnwindSafe(|| {
subject.clone().next(1);
}));
assert!(result.is_err());
}
#[cfg(not(target_arch = "wasm32"))]
#[rxrust_macro::test]
fn test_subject_reentrant_error_panics() {
use std::panic::{AssertUnwindSafe, catch_unwind};
struct ReentrantError<S> {
subject: S,
}
impl<S> Observer<i32, ()> for ReentrantError<S>
where
S: Clone + Observer<i32, ()>,
{
fn next(&mut self, _value: i32) { self.subject.clone().error(()); }
fn error(self, _err: ()) {}
fn complete(self) {}
fn is_closed(&self) -> bool { false }
}
let subject = Local::subject::<i32, ()>();
subject
.clone()
.subscribe_with(ReentrantError { subject: subject.clone() });
let result = catch_unwind(AssertUnwindSafe(|| {
subject.clone().next(1);
}));
assert!(result.is_err());
}
#[rxrust_macro::test]
fn test_mut_ref_subject_emit() {
let subject = Local::subject_mut_ref();
let mut producer = subject.clone();
subject.clone().subscribe(|v: &mut i32| *v += 1);
subject.subscribe(|v: &mut i32| *v *= 2);
let mut value = 10;
producer.next(&mut value);
assert_eq!(value, 22);
}
#[rxrust_macro::test]
fn test_behavior_subject() {
let mut bs = Shared::behavior_subject::<i32, Infallible>(0);
bs.next(1);
let results = std::sync::Arc::new(std::sync::Mutex::new(vec![]));
let results_clone = results.clone();
bs.clone().subscribe(move |v| {
results_clone.lock().unwrap().push(v);
});
assert_eq!(*results.lock().unwrap(), vec![1]);
bs.next(2);
assert_eq!(*results.lock().unwrap(), vec![1, 2]);
}
#[rxrust_macro::test]
fn test_scoped_subject() {
let x = 100;
let subject = Local::subject();
let mut producer = subject.clone();
subject.clone().subscribe(move |v| {
assert_eq!(v + x, 110);
});
producer.next(10);
}
#[rxrust_macro::test]
fn test_behavior_subject_multiple_subscribers() {
let mut bs = Shared::behavior_subject::<i32, Infallible>(0);
let results1 = std::sync::Arc::new(std::sync::Mutex::new(vec![]));
let results2 = std::sync::Arc::new(std::sync::Mutex::new(vec![]));
let r1 = results1.clone();
let r2 = results2.clone();
bs.clone().subscribe(move |v| {
r1.lock().unwrap().push(v);
});
bs.clone().subscribe(move |v| {
r2.lock().unwrap().push(v);
});
assert_eq!(*results1.lock().unwrap(), vec![0]);
assert_eq!(*results2.lock().unwrap(), vec![0]);
bs.next(1);
assert_eq!(*results1.lock().unwrap(), vec![0, 1]);
assert_eq!(*results2.lock().unwrap(), vec![0, 1]);
bs.next(2);
assert_eq!(*results1.lock().unwrap(), vec![0, 1, 2]);
assert_eq!(*results2.lock().unwrap(), vec![0, 1, 2]);
}
#[rxrust_macro::test]
fn test_subject_with_publish_integration() {
use crate::observable::Observable;
let source = Local::of(1).merge(Local::of(2));
let connectable = source.publish();
let results = Rc::new(RefCell::new(vec![]));
let results_clone = results.clone();
let obs1 = connectable.fork();
let obs2 = connectable.fork();
obs1.subscribe(move |v| {
results_clone
.borrow_mut()
.push(format!("obs1: {}", v));
});
let results_clone2 = results.clone();
obs2.subscribe(move |v| {
results_clone2
.borrow_mut()
.push(format!("obs2: {}", v));
});
connectable.connect();
let received = results.borrow();
assert!(received.contains(&"obs1: 1".to_string()));
assert!(received.contains(&"obs1: 2".to_string()));
assert!(received.contains(&"obs2: 1".to_string()));
assert!(received.contains(&"obs2: 2".to_string()));
}
#[rxrust_macro::test]
fn test_subject_with_publish_mut_ref_integration() {
use crate::observable::Observable;
let mut source = Local::subject_mut_ref();
let connectable = source.clone().publish_mut_ref();
let results = Rc::new(RefCell::new(vec![]));
let results_clone = results.clone();
let obs1 = connectable.fork();
let obs2 = connectable.fork();
obs1.subscribe(move |v: &mut i32| {
*v += 1;
results_clone
.borrow_mut()
.push(format!("obs1: {}", v));
});
let results_clone2 = results.clone();
obs2.subscribe(move |v: &mut i32| {
*v *= 2;
results_clone2
.borrow_mut()
.push(format!("obs2: {}", v));
});
let _connection = connectable.connect();
let mut test_value = 10;
source.next(&mut test_value);
let received = results.borrow();
assert!(received.contains(&"obs1: 11".to_string()));
assert!(received.contains(&"obs2: 22".to_string()));
}
#[rxrust_macro::test]
fn test_subject_multicast_with_different_subjects() {
use crate::observable::Observable;
let source = Local::of(42);
let custom_subject = Local::subject();
let connectable = source.multicast(custom_subject.into_inner());
let results = Rc::new(RefCell::new(vec![]));
let results_clone = results.clone();
let obs = connectable.fork();
obs.subscribe(move |v| {
results_clone.borrow_mut().push(v);
});
connectable.connect();
assert_eq!(*results.borrow(), vec![42]);
}
#[rxrust_macro::test]
fn test_subject_multicast_mut_ref_with_different_subjects() {
use crate::observable::Observable;
let mut source = Local::subject_mut_ref();
let custom_subject = Local::subject_mut_ref();
let connectable = source
.clone()
.multicast_mut_ref(custom_subject.into_inner());
let results = Rc::new(RefCell::new(vec![]));
let results_clone = results.clone();
let obs1 = connectable.fork();
let obs2 = connectable.fork();
obs1.subscribe(move |v: &mut i32| {
*v += 100;
results_clone
.borrow_mut()
.push(format!("obs1: {}", v));
});
let results_clone2 = results.clone();
obs2.subscribe(move |v: &mut i32| {
*v *= 3;
results_clone2
.borrow_mut()
.push(format!("obs2: {}", v));
});
let _connection = connectable.connect();
let mut test_value = 5;
source.next(&mut test_value);
let received = results.borrow();
assert!(received.contains(&"obs1: 105".to_string()));
assert!(received.contains(&"obs2: 315".to_string()));
}
#[rxrust_macro::test]
fn test_subject_early_vs_late_subscription() {
use crate::observable::Observable;
let subject = Local::subject();
let early_results = Rc::new(RefCell::new(vec![]));
let early_results_clone = early_results.clone();
subject.clone().subscribe(move |v| {
early_results_clone.borrow_mut().push(v);
});
subject.clone().next(1);
let late_results = Rc::new(RefCell::new(vec![]));
let late_results_clone = late_results.clone();
subject.clone().subscribe(move |v| {
late_results_clone.borrow_mut().push(v);
});
subject.clone().next(2);
assert_eq!(*early_results.borrow(), vec![1, 2]);
assert_eq!(*late_results.borrow(), vec![2]);
}
}