use crate::{
context::{Context, RcDerefMut},
observable::{CoreObservable, ObservableType},
observer::Observer,
subscription::Subscription,
};
#[derive(Clone)]
pub struct Finalize<S, F> {
pub source: S,
pub func: F,
}
pub struct FinalizeObserver<O, P> {
observer: O,
func: P,
}
impl<O, P, F, Item, Err> Observer<Item, Err> for FinalizeObserver<O, P>
where
O: Observer<Item, Err>,
P: RcDerefMut<Target = Option<F>>,
F: FnOnce(),
{
fn next(&mut self, v: Item) { self.observer.next(v); }
fn error(self, e: Err) {
self.observer.error(e);
if let Some(func) = self.func.rc_deref_mut().take() {
func();
}
}
fn complete(self) {
self.observer.complete();
if let Some(func) = self.func.rc_deref_mut().take() {
func();
}
}
fn is_closed(&self) -> bool { self.observer.is_closed() }
}
impl<S, F> ObservableType for Finalize<S, F>
where
S: ObservableType,
{
type Item<'a>
= S::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
pub struct FinalizeSubscription<U, P> {
subscription: U,
func: P,
}
impl<U, P, F> Subscription for FinalizeSubscription<U, P>
where
U: Subscription,
P: RcDerefMut<Target = Option<F>>,
F: FnOnce(),
{
fn unsubscribe(self) {
self.subscription.unsubscribe();
if let Some(func) = self.func.rc_deref_mut().take() {
func();
}
}
fn is_closed(&self) -> bool { self.subscription.is_closed() }
}
impl<S, F, C> CoreObservable<C> for Finalize<S, F>
where
C: Context,
S: CoreObservable<C::With<FinalizeObserver<C::Inner, C::RcMut<Option<F>>>>>,
F: FnOnce(),
{
type Unsub = FinalizeSubscription<S::Unsub, C::RcMut<Option<F>>>;
fn subscribe(self, context: C) -> Self::Unsub {
let Finalize { source, func } = self;
let func_ptr: C::RcMut<Option<F>> = C::RcMut::from(Some(func));
let func_clone = func_ptr.clone();
let wrapped = context.transform(|observer| FinalizeObserver { observer, func: func_ptr });
let subscription = source.subscribe(wrapped);
FinalizeSubscription { subscription, func: func_clone }
}
}
#[cfg(test)]
mod tests {
use std::{
cell::{Cell, RefCell},
rc::Rc,
};
use crate::{prelude::*, subscription::Subscription};
#[rxrust_macro::test]
fn test_finalize_on_complete_simple() {
let finalized = Rc::new(Cell::new(false));
let mut nexted = false;
let finalized_clone = finalized.clone();
Local::of(1)
.finalize(move || finalized_clone.set(true))
.subscribe(|_| nexted = true);
assert!(finalized.get());
assert!(nexted);
}
#[rxrust_macro::test]
fn test_finalize_on_complete_subject() {
let finalized = Rc::new(Cell::new(false));
let nexted = Rc::new(Cell::new(false));
let subject: Local<_> = Local::subject();
let finalized_clone = finalized.clone();
let nexted_clone = nexted.clone();
subject
.clone()
.finalize(move || finalized_clone.set(true))
.subscribe(move |_: i32| nexted_clone.set(true));
subject.clone().next(1);
subject.clone().next(2);
subject.complete();
assert!(finalized.get());
assert!(nexted.get());
}
#[rxrust_macro::test]
fn test_finalize_on_unsubscribe() {
let finalized = Rc::new(Cell::new(false));
let nexted = Rc::new(Cell::new(false));
let subject: Local<_> = Local::subject();
let finalized_clone = finalized.clone();
let nexted_clone = nexted.clone();
let subscription = subject
.clone()
.finalize(move || finalized_clone.set(true))
.subscribe(move |_: i32| nexted_clone.set(true));
subject.clone().next(1);
subject.clone().next(2);
subscription.unsubscribe();
assert!(finalized.get());
assert!(nexted.get());
}
#[rxrust_macro::test]
fn test_finalize_on_error() {
let finalized = Rc::new(Cell::new(false));
let nexted = Rc::new(Cell::new(false));
let errored = Rc::new(Cell::new(false));
let subject: Local<_> = Local::subject::<i32, &'static str>();
let finalized_clone = finalized.clone();
let nexted_clone = nexted.clone();
let errored_clone = errored.clone();
subject
.clone()
.finalize(move || finalized_clone.set(true))
.on_error(move |_| errored_clone.set(true))
.subscribe(move |_| nexted_clone.set(true));
subject.clone().next(1);
subject.clone().next(2);
subject.error("oops");
assert!(finalized.get());
assert!(errored.get());
assert!(nexted.get());
}
#[rxrust_macro::test]
fn test_finalize_only_once() {
let finalize_count = Rc::new(Cell::new(0));
let subject: Local<_> = Local::subject::<i32, &'static str>();
let finalized_clone = finalize_count.clone();
let subscription = subject
.clone()
.finalize(move || finalized_clone.set(finalized_clone.get() + 1))
.on_error(|_| {})
.subscribe(|_| {});
subject.clone().next(1);
subject.clone().next(2);
subject.clone().error("oops");
subscription.unsubscribe();
assert_eq!(finalize_count.get(), 1);
}
#[rxrust_macro::test]
fn test_finalize_chaining() {
let order = Rc::new(RefCell::new(Vec::new()));
let order_clone1 = order.clone();
let order_clone2 = order.clone();
Local::of(1)
.finalize(move || order_clone1.borrow_mut().push(1))
.finalize(move || order_clone2.borrow_mut().push(2))
.subscribe(|_| {});
assert_eq!(*order.borrow(), vec![2, 1]);
}
}