use std::any::Any;
use std::cell::RefCell;
use std::rc::Rc;
use super::rx_ref::RxRef;
use super::rx_subject::RxSubject;
use super::rx_val::RxVal;
use super::tracker::Tracker;
type Subscriber<T> = Rc<RefCell<Box<dyn FnMut(&T)>>>;
pub(super) struct RxObservableInner<T> {
subscribers: Vec<Subscriber<T>>,
pub(super) _lifetime_tracker: Option<Rc<dyn Any>>,
}
pub struct RxObservable<T> {
pub(super) inner: Rc<RefCell<RxObservableInner<T>>>,
}
impl<T> Clone for RxObservable<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: 'static> RxObservable<T> {
pub fn subscribe<F>(&self, tracker: &Tracker, f: F)
where
F: FnMut(&T) + 'static,
{
let subscriber = Rc::new(RefCell::new(Box::new(f) as Box<dyn FnMut(&T)>));
let subscriber_clone = subscriber.clone();
let inner_clone = self.inner.clone();
self.inner.borrow_mut().subscribers.push(subscriber_clone);
tracker.add(move || {
inner_clone
.borrow_mut()
.subscribers
.retain(|s| !Rc::ptr_eq(s, &subscriber));
});
}
pub fn subscriber_count(&self) -> usize {
self.inner.borrow().subscribers.len()
}
}
impl<T: 'static> RxObservable<T> {
pub(crate) fn new() -> Self {
Self {
inner: Rc::new(RefCell::new(RxObservableInner {
subscribers: Vec::new(),
_lifetime_tracker: None,
})),
}
}
pub(crate) fn emit(&self, value: &T) {
let inner = self.inner.borrow();
for subscriber in &inner.subscribers {
let mut sub = subscriber.borrow_mut();
sub(value);
}
}
pub fn to_val(&self, initial: T, tracker: &Tracker) -> RxVal<T>
where
T: Clone + PartialEq,
{
let rx_ref = RxRef::new(initial);
let rx_ref_clone = rx_ref.clone();
self.subscribe(tracker, move |value| {
rx_ref_clone.set(value.clone());
});
rx_ref.val()
}
pub fn map<B, F>(&self, f: F) -> RxObservable<B>
where
B: Clone + 'static,
F: Fn(&T) -> B + 'static,
{
use super::rx_subject::RxSubject;
let subject = RxSubject::new();
let tracker = Rc::new(Tracker::new());
let subject_clone = subject.clone();
self.subscribe(&tracker, move |value| {
subject_clone.next(f(value));
});
let observable = subject.observable();
observable.inner.borrow_mut()._lifetime_tracker = Some(tracker as Rc<dyn Any>);
observable
}
pub fn flat_map_val<B, F>(&self, f: F) -> RxObservable<B>
where
B: Clone + PartialEq + 'static,
F: Fn(&T) -> RxVal<B> + 'static,
{
use super::rx_subject::RxSubject;
let subject = RxSubject::new();
let outer_tracker = Rc::new(Tracker::new());
let inner_tracker = Rc::new(RefCell::new(Tracker::new()));
let subject_clone = subject.clone();
let inner_tracker_clone = inner_tracker.clone();
let f_rc = Rc::new(f);
self.subscribe(&outer_tracker, move |outer_value| {
let new_inner = f_rc(outer_value);
*inner_tracker_clone.borrow_mut() = Tracker::new();
subject_clone.next(new_inner.get());
let subject_clone2 = subject_clone.clone();
new_inner.subscribe(&inner_tracker_clone.borrow(), move |inner_value| {
subject_clone2.next(inner_value.clone());
});
});
let observable = subject.observable();
let combined_tracker = Rc::new((outer_tracker, inner_tracker));
observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
observable
}
pub fn flat_map_ref<B, F>(&self, f: F) -> RxObservable<B>
where
B: Clone + PartialEq + 'static,
F: Fn(&T) -> RxRef<B> + 'static,
{
self.flat_map_val(move |x| f(x).val())
}
pub fn flat_map_observable<B, F>(&self, f: F) -> RxObservable<B>
where
B: Clone + 'static,
F: Fn(&T) -> RxObservable<B> + 'static,
{
use super::rx_subject::RxSubject;
let subject = RxSubject::new();
let outer_tracker = Rc::new(Tracker::new());
let inner_tracker = Rc::new(RefCell::new(Tracker::new()));
let subject_clone = subject.clone();
let inner_tracker_clone = inner_tracker.clone();
let f_rc = Rc::new(f);
self.subscribe(&outer_tracker, move |outer_value| {
let new_inner = f_rc(outer_value);
*inner_tracker_clone.borrow_mut() = Tracker::new();
let subject_clone2 = subject_clone.clone();
new_inner.subscribe(&inner_tracker_clone.borrow(), move |inner_value| {
subject_clone2.next(inner_value.clone());
});
});
let observable = subject.observable();
let combined_tracker = Rc::new((outer_tracker, inner_tracker));
observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
observable
}
pub fn flat_map_subject<B, F>(&self, f: F) -> RxObservable<B>
where
B: Clone + 'static,
F: Fn(&T) -> RxSubject<B> + 'static,
{
self.flat_map_observable(move |x| f(x).observable())
}
pub fn join_observable(&self, other: RxObservable<T>) -> RxObservable<T>
where
T: Clone,
{
use super::rx_subject::RxSubject;
let subject = RxSubject::new();
let tracker1 = Rc::new(Tracker::new());
let tracker2 = Rc::new(Tracker::new());
let subject_clone1 = subject.clone();
self.subscribe(&tracker1, move |value| {
subject_clone1.next(value.clone());
});
let subject_clone2 = subject.clone();
other.subscribe(&tracker2, move |value| {
subject_clone2.next(value.clone());
});
let observable = subject.observable();
let combined_tracker = Rc::new((tracker1, tracker2));
observable.inner.borrow_mut()._lifetime_tracker = Some(combined_tracker as Rc<dyn Any>);
observable
}
pub fn join_subject(&self, other: RxSubject<T>) -> RxObservable<T>
where
T: Clone,
{
self.join_observable(other.observable())
}
}