use super::subject_core::Subject;
use crate::{
context::Context,
observable::{CoreObservable, ObservableType},
observer::Observer,
};
pub struct BehaviorSubject<Item: Clone, P> {
pub subject: Subject<P>,
pub value: Item,
}
impl<Item: Clone, P: Clone> Clone for BehaviorSubject<Item, P> {
fn clone(&self) -> Self { Self { subject: self.subject.clone(), value: self.value.clone() } }
}
impl<Item: Clone, P> BehaviorSubject<Item, P>
where
Subject<P>: Default,
{
pub fn new(initial: Item) -> Self { Self { subject: Subject::default(), value: initial } }
}
impl<Item, Err, P> Observer<Item, Err> for BehaviorSubject<Item, P>
where
Item: Clone,
Subject<P>: Observer<Item, Err>,
{
fn next(&mut self, value: Item) {
self.value = value.clone();
self.subject.next(value);
}
fn error(self, err: Err) { self.subject.error(err); }
fn complete(self) { self.subject.complete(); }
fn is_closed(&self) -> bool { self.subject.is_closed() }
}
impl<Item, Err, P> ObservableType for BehaviorSubject<Item, P>
where
Subject<P>: ObservableType<Err = Err>,
Item: Clone,
{
type Item<'a>
= <Subject<P> as ObservableType>::Item<'a>
where
Self: 'a;
type Err = Err;
}
impl<Item, Err, C, P> CoreObservable<C> for BehaviorSubject<Item, P>
where
C: Context + Observer<Item, Err>,
Subject<P>: CoreObservable<C, Err = Err>,
Item: Clone,
{
type Unsub = <Subject<P> as CoreObservable<C>>::Unsub;
fn subscribe(self, mut observer: C) -> Self::Unsub {
observer.next(self.value.clone());
self.subject.subscribe(observer)
}
}
pub trait Behavior {
type Item;
fn peek(&self) -> Self::Item;
fn next_by(&mut self, f: impl FnOnce(Self::Item) -> Self::Item);
}
impl<Item, P> Behavior for BehaviorSubject<Item, P>
where
Item: Clone,
Self: Observer<Item, ()>,
{
type Item = Item;
fn peek(&self) -> Item { self.value.clone() }
fn next_by(&mut self, f: impl FnOnce(Self::Item) -> Self::Item) {
let new_val = f(self.peek());
self.value = new_val.clone();
self.next(new_val);
}
}
impl<C: Context<Inner: Behavior>> Behavior for C {
type Item = <C::Inner as Behavior>::Item;
fn peek(&self) -> Self::Item { self.inner().peek() }
fn next_by(&mut self, f: impl FnOnce(Self::Item) -> Self::Item) { self.inner_mut().next_by(f) }
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, rc::Rc};
use super::*;
use crate::prelude::*;
fn create_value_capture<T>() -> (Rc<RefCell<Vec<T>>>, impl FnMut(T) + Clone)
where
T: Clone,
{
let values = Rc::new(RefCell::new(Vec::new()));
let values_clone = values.clone();
let capture = move |value: T| {
values_clone.borrow_mut().push(value);
};
(values, capture)
}
#[rxrust_macro::test]
fn test_behavior_subject_basic() {
let mut behavior = Local::behavior_subject(42);
let (values, capture) = create_value_capture();
behavior.clone().subscribe(capture);
assert_eq!(*values.borrow(), vec![42]);
behavior.next(1);
behavior.next(2);
behavior.next(3);
assert_eq!(*values.borrow(), vec![42, 1, 2, 3]);
}
#[cfg(not(target_arch = "wasm32"))]
#[rxrust_macro::test]
fn test_behavior_subject_shared() {
use std::sync::{Arc, Mutex};
let behavior = Arc::new(Mutex::new(Shared::behavior_subject(100)));
let values = Arc::new(Mutex::new(Vec::new()));
let behavior_clone = behavior.clone();
let values_clone = values.clone();
std::thread::spawn(move || {
let b = behavior_clone.lock().unwrap();
b.clone().subscribe(move |v| {
values_clone.lock().unwrap().push(v);
});
})
.join()
.unwrap();
std::thread::sleep(Duration::from_millis(10));
assert_eq!(*values.lock().unwrap(), vec![100]);
{
let mut b = behavior.lock().unwrap();
b.next(200);
b.next(300);
}
std::thread::sleep(Duration::from_millis(10));
assert_eq!(*values.lock().unwrap(), vec![100, 200, 300]);
}
#[rxrust_macro::test]
fn test_behavior_peek() {
let mut behavior = Local::behavior_subject(10);
let (values, capture) = create_value_capture();
behavior.clone().subscribe(capture);
assert_eq!(*values.borrow(), vec![10]);
behavior.next(20);
assert_eq!(*values.borrow(), vec![10, 20]);
behavior.next(30);
assert_eq!(*values.borrow(), vec![10, 20, 30]);
}
#[rxrust_macro::test]
fn test_behavior_next_by() {
let mut behavior = Local::behavior_subject(0);
let (values, capture) = create_value_capture();
behavior.clone().subscribe(capture);
assert_eq!(*values.borrow(), vec![0]);
for i in 1..=3 {
behavior.next(i);
}
assert_eq!(*values.borrow(), vec![0, 1, 2, 3]);
}
#[rxrust_macro::test]
fn test_behavior_subject_multiple_subscribers() {
let mut behavior = Local::behavior_subject(99);
let (values1, capture1) = create_value_capture();
let (values2, capture2) = create_value_capture();
let (values3, capture3) = create_value_capture();
behavior.clone().subscribe(capture1);
assert_eq!(*values1.borrow(), vec![99]);
behavior.next(100);
assert_eq!(*values1.borrow(), vec![99, 100]);
behavior.clone().subscribe(capture2);
assert_eq!(*values2.borrow(), vec![100]);
behavior.clone().subscribe(capture3);
assert_eq!(*values3.borrow(), vec![100]);
behavior.next(101);
assert_eq!(*values1.borrow(), vec![99, 100, 101]);
assert_eq!(*values2.borrow(), vec![100, 101]);
assert_eq!(*values3.borrow(), vec![100, 101]);
}
#[rxrust_macro::test]
fn test_behavior_subject_complete() {
let mut behavior = Local::behavior_subject(1);
let (values, capture) = create_value_capture();
behavior.clone().subscribe(capture);
assert_eq!(*values.borrow(), vec![1]);
behavior.next(2);
behavior.next(3);
assert_eq!(*values.borrow(), vec![1, 2, 3]);
behavior.clone().complete();
}
#[rxrust_macro::test]
fn test_behavior_subject_error() {
let mut behavior = Local::behavior_subject(10);
let (values, capture) = create_value_capture();
behavior.clone().subscribe(capture);
assert_eq!(*values.borrow(), vec![10]);
behavior.next(20);
behavior.next(30);
assert_eq!(*values.borrow(), vec![10, 20, 30]);
}
#[rxrust_macro::test]
fn test_behavior_subject_strings() {
let mut behavior = Local::behavior_subject("hello".to_string());
let (values, capture) = create_value_capture();
behavior.clone().subscribe(capture);
assert_eq!(*values.borrow(), vec!["hello"]);
behavior.next("world".to_string());
behavior.next("rxrust".to_string());
assert_eq!(*values.borrow(), vec!["hello", "world", "rxrust"]);
}
#[rxrust_macro::test]
fn test_behavior_subject_clone() {
let mut behavior1 = Local::behavior_subject(5);
let mut behavior2 = behavior1.clone();
let (values1, capture1) = create_value_capture();
let (values2, capture2) = create_value_capture();
behavior1.clone().subscribe(capture1);
behavior2.clone().subscribe(capture2);
assert_eq!(*values1.borrow(), vec![5]);
assert_eq!(*values2.borrow(), vec![5]);
behavior1.next(10);
assert_eq!(*values1.borrow(), vec![5, 10]);
assert_eq!(*values2.borrow(), vec![5, 10]);
behavior2.next(15);
assert_eq!(*values1.borrow(), vec![5, 10, 15]);
assert_eq!(*values2.borrow(), vec![5, 10, 15]);
}
#[rxrust_macro::test]
fn test_behavior_context_wrapper() {
let mut behavior = Local::behavior_subject(42);
let (values, capture) = create_value_capture();
behavior.clone().subscribe(capture);
assert_eq!(*values.borrow(), vec![42]);
behavior.next(52);
assert_eq!(*values.borrow(), vec![42, 52]);
}
#[rxrust_macro::test]
fn test_behavior_subject_complex_type() {
#[derive(Debug, Clone, PartialEq)]
struct Point {
x: i32,
y: i32,
}
let mut behavior = Local::behavior_subject(Point { x: 0, y: 0 });
let (values, capture) = create_value_capture();
behavior.clone().subscribe(capture);
assert_eq!(*values.borrow(), vec![Point { x: 0, y: 0 }]);
behavior.next(Point { x: 1, y: 2 });
behavior.next(Point { x: 3, y: 4 });
let expected = vec![Point { x: 0, y: 0 }, Point { x: 1, y: 2 }, Point { x: 3, y: 4 }];
assert_eq!(*values.borrow(), expected);
behavior.next(Point { x: 4, y: 5 });
assert_eq!(values.borrow().last(), Some(&Point { x: 4, y: 5 }));
}
}