use crate::{
context::Context,
observable::{defer::Defer, *},
observer::Emitter,
scheduler::{Duration, Instant},
subject::{BehaviorSubject, Subject, SubjectPtr, SubjectPtrMutRef},
subscription::Subscription,
};
pub trait ObservableFactory: Context<Inner = ()> {
fn create<Item, Err, F, U>(f: F) -> Self::With<Create<F, Item, Err>>
where
F: FnOnce(&mut dyn Emitter<Item, Err>) -> U,
U: Subscription,
{
Self::lift(Create::new(f))
}
fn of<V>(v: V) -> Self::With<Of<V>> {
Self::lift(Of(v)) }
fn empty() -> Self::With<Empty> { Self::lift(Empty) }
fn never() -> Self::With<Never> { Self::lift(Never) }
fn throw_err<E>(error: E) -> Self::With<ThrowErr<E>> { Self::lift(ThrowErr { error }) }
fn subject<'a, Item, Err>() -> Self::With<Subject<SubjectPtr<'a, Self, Item, Err>>> {
Self::lift(Subject::default())
}
fn subject_mut_ref<'a, Item, Err>() -> Self::With<Subject<SubjectPtrMutRef<'a, Self, Item, Err>>>
{
Self::lift(Subject::default())
}
fn behavior_subject<'a, Item: Clone, Err>(
initial: Item,
) -> Self::With<BehaviorSubject<Item, SubjectPtr<'a, Self, Item, Err>>> {
Self::lift(BehaviorSubject::new(initial))
}
fn behavior_subject_mut_ref<'a, Item: Clone + 'a, Err>(
initial: Item,
) -> Self::With<BehaviorSubject<Item, SubjectPtrMutRef<'a, Self, Item, Err>>> {
Self::lift(BehaviorSubject::new(initial))
}
fn from_iter<I: IntoIterator>(iter: I) -> Self::With<FromIter<I>> { Self::lift(from_iter(iter)) }
fn from_fn<F>(f: F) -> Self::With<FromFn<F>> { Self::lift(FromFn(f)) }
fn defer<F, O>(f: F) -> Self::With<Defer<F, O>>
where
F: FnOnce() -> Self::With<O>,
O: ObservableType,
{
Self::lift(Defer::new(f))
}
fn timer(delay: Duration) -> Self::With<Timer<Self::Scheduler>> {
Self::lift(Timer { delay, scheduler: Self::Scheduler::default() })
}
fn timer_with<S>(delay: Duration, scheduler: S) -> Self::With<Timer<S>> {
Self::lift(Timer { delay, scheduler })
}
fn timer_at(at: Instant) -> Self::With<Timer<Self::Scheduler>> {
let now = Instant::now();
let delay = if at > now { at - now } else { Duration::default() };
Self::lift(Timer { delay, scheduler: Self::Scheduler::default() })
}
fn timer_at_with<S>(at: Instant, scheduler: S) -> Self::With<Timer<S>> {
let now = Instant::now();
let delay = if at > now { at - now } else { Duration::default() };
Self::lift(Timer { delay, scheduler })
}
fn interval(period: Duration) -> Self::With<Interval<Self::Scheduler>> {
Self::lift(Interval { period, scheduler: Self::Scheduler::default() })
}
fn interval_with<S>(period: Duration, scheduler: S) -> Self::With<Interval<S>> {
Self::lift(Interval { period, scheduler })
}
fn from_future<F: std::future::Future>(
future: F,
) -> Self::With<from_future::FromFuture<F, Self::Scheduler>> {
Self::lift(from_future::FromFuture { future, scheduler: Self::Scheduler::default() })
}
fn from_future_with<F, S>(future: F, scheduler: S) -> Self::With<from_future::FromFuture<F, S>> {
Self::lift(from_future::FromFuture { future, scheduler })
}
fn from_future_result<F, Item, Err>(
future: F,
) -> Self::With<from_future::FromFutureResult<F, Self::Scheduler>>
where
F: std::future::Future<Output = Result<Item, Err>>,
{
Self::lift(from_future::FromFutureResult { future, scheduler: Self::Scheduler::default() })
}
fn from_future_result_with<F, S, Item, Err>(
future: F, scheduler: S,
) -> Self::With<from_future::FromFutureResult<F, S>>
where
F: std::future::Future<Output = Result<Item, Err>>,
{
Self::lift(from_future::FromFutureResult { future, scheduler })
}
fn from_stream<St>(stream: St) -> Self::With<from_stream::FromStream<St, Self::Scheduler>>
where
St: futures_core::stream::Stream,
{
Self::lift(from_stream::FromStream { stream, scheduler: Self::Scheduler::default() })
}
fn from_stream_with<St, S>(stream: St, scheduler: S) -> Self::With<from_stream::FromStream<St, S>>
where
St: futures_core::stream::Stream,
{
Self::lift(from_stream::FromStream { stream, scheduler })
}
fn from_stream_result<St, Item, Err>(
stream: St,
) -> Self::With<from_stream::FromStreamResult<St, Self::Scheduler>>
where
St: futures_core::stream::Stream<Item = Result<Item, Err>>,
{
Self::lift(from_stream::FromStreamResult { stream, scheduler: Self::Scheduler::default() })
}
fn from_stream_result_with<St, S, Item, Err>(
stream: St, scheduler: S,
) -> Self::With<from_stream::FromStreamResult<St, S>>
where
St: futures_core::stream::Stream<Item = Result<Item, Err>>,
{
Self::lift(from_stream::FromStreamResult { stream, scheduler })
}
fn merge_observables<O, I>(
observables: I,
) -> Self::With<crate::ops::merge_all::MergeAll<FromIter<I::IntoIter>>>
where
O: ObservableType,
I: IntoIterator<Item = Self::With<O>>,
{
let observables = observables.into_iter();
Self::lift(crate::ops::merge_all::MergeAll {
source: from_iter(observables),
concurrent: usize::MAX,
})
}
fn concat_observables<O, I>(
observables: I,
) -> Self::With<crate::ops::merge_all::MergeAll<FromIter<I::IntoIter>>>
where
O: ObservableType,
I: IntoIterator<Item = Self::With<O>>,
{
let observables = observables.into_iter();
Self::lift(crate::ops::merge_all::MergeAll {
source: from_iter(observables),
concurrent: 1, })
}
}
impl<C: Context<Inner = ()>> ObservableFactory for C {}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
context::{CellRc, MutRc},
prelude::*,
scheduler::{Task, TaskHandle},
};
#[derive(Clone, Copy, Default)]
struct CustomTestScheduler;
impl<S> crate::scheduler::Schedulable<CustomTestScheduler> for Task<S> {
type Future = std::future::Ready<()>;
fn into_future(self, _scheduler: &CustomTestScheduler) -> Self::Future {
std::future::ready(())
}
}
impl<S> crate::scheduler::Scheduler<Task<S>> for CustomTestScheduler {
fn schedule(&self, mut task: Task<S>, _delay: Option<Duration>) -> TaskHandle {
task.step();
TaskHandle::finished()
}
}
#[derive(Clone)]
struct CustomContext<T> {
inner: T,
scheduler: CustomTestScheduler,
}
impl<T> Context for CustomContext<T> {
type Scope = LocalScope;
type Inner = T;
type Scheduler = CustomTestScheduler;
type RcMut<U> = MutRc<U>;
type RcCell<U: Copy + Eq> = CellRc<U>;
type With<U> = CustomContext<U>;
type BoxedObserver<'a, Item, Err> = BoxedObserver<'a, Item, Err>;
type BoxedObserverMutRef<'a, Item: 'a, Err> = BoxedObserverMutRef<'a, Item, Err>;
type BoxedSubscription = BoxedSubscription;
type BoxedCoreObservable<'a, Item, Err> =
crate::ops::box_it::BoxedCoreObservable<'a, Item, Err, CustomTestScheduler>;
type BoxedCoreObservableMutRef<'a, Item: 'a, Err> =
crate::ops::box_it::BoxedCoreObservableMutRef<'a, Item, Err, CustomTestScheduler>;
type BoxedCoreObservableClone<'a, Item, Err> =
crate::ops::box_it::BoxedCoreObservableClone<'a, Item, Err, CustomTestScheduler>;
type BoxedCoreObservableMutRefClone<'a, Item: 'a, Err> =
crate::ops::box_it::BoxedCoreObservableMutRefClone<'a, Item, Err, CustomTestScheduler>;
fn from_parts(inner: T, scheduler: CustomTestScheduler) -> CustomContext<T> {
CustomContext { inner, scheduler }
}
fn lift<U>(inner: U) -> CustomContext<U> {
CustomContext { inner, scheduler: CustomTestScheduler }
}
fn scheduler(&self) -> &CustomTestScheduler { &self.scheduler }
fn inner(&self) -> &T { &self.inner }
fn inner_mut(&mut self) -> &mut T { &mut self.inner }
fn transform<U, F>(self, f: F) -> CustomContext<U>
where
F: FnOnce(T) -> U,
{
CustomContext { inner: f(self.inner), scheduler: self.scheduler }
}
fn wrap<U>(&self, inner: U) -> CustomContext<U> {
CustomContext { inner, scheduler: self.scheduler }
}
fn swap<U>(self, new_inner: U) -> (T, CustomContext<U>) {
(self.inner, CustomContext { inner: new_inner, scheduler: self.scheduler })
}
fn into_inner(self) -> Self::Inner { self.inner }
fn into_parts(self) -> (Self::Inner, Self::Scheduler) { (self.inner, self.scheduler) }
}
#[rxrust_macro::test]
fn test_factory_blanket_impl_with_defaults() {
let _local_of = Local::of(1);
let _shared_of = Shared::of(2);
}
#[rxrust_macro::test]
fn test_factory_blanket_impl_with_custom_scheduler() {
let _custom_local_of = CustomContext::of("hello");
}
#[rxrust_macro::test]
fn test_merge_observables_factory() {
use std::{cell::RefCell, rc::Rc};
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let obs1 = Local::from_iter([1, 2]);
let obs2 = Local::from_iter([3, 4]);
let obs3 = Local::from_iter([5, 6]);
Local::merge_observables([obs1, obs2, obs3]).subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
let merged = result.borrow();
assert_eq!(merged.len(), 6);
assert!(merged.contains(&1));
assert!(merged.contains(&2));
assert!(merged.contains(&3));
assert!(merged.contains(&4));
assert!(merged.contains(&5));
assert!(merged.contains(&6));
}
#[rxrust_macro::test]
fn test_merge_observables_factory_with_vec() {
use std::{cell::RefCell, rc::Rc};
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let observables =
vec![Local::from_iter([1, 2]), Local::from_iter([3, 4]), Local::from_iter([5, 6])];
Local::merge_observables(observables).subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(result.borrow().len(), 6);
}
#[rxrust_macro::test]
fn test_concat_observables_factory() {
use std::{cell::RefCell, rc::Rc};
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
let obs1 = Local::from_iter([1, 2]);
let obs2 = Local::from_iter([3, 4]);
let obs3 = Local::from_iter([5, 6]);
Local::concat_observables([obs1, obs2, obs3]).subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(*result.borrow(), vec![1, 2, 3, 4, 5, 6]);
}
#[rxrust_macro::test]
fn test_merge_observables_factory_empty() {
use std::{cell::RefCell, rc::Rc};
let result = Rc::new(RefCell::new(Vec::<i32>::new()));
let result_clone = result.clone();
let completed = Rc::new(RefCell::new(false));
let completed_clone = completed.clone();
let observables: Vec<Local<FromIter<std::vec::IntoIter<i32>>>> = vec![];
Local::merge_observables(observables)
.on_complete(move || *completed_clone.borrow_mut() = true)
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert!(result.borrow().is_empty());
assert!(*completed.borrow());
}
}