pub use crate::rc::*;
#[cfg(feature = "scheduler")]
use crate::scheduler::{LocalScheduler, SharedScheduler};
use crate::{
observer::{
BoxedObserver, BoxedObserverMutRef, BoxedObserverMutRefSend, BoxedObserverSend, Observer,
},
subscription::{BoxedSubscription, BoxedSubscriptionSend, Subscription},
};
pub trait Scope {
type RcMut<T>: From<T> + Clone + RcDerefMut<Target = T>;
type RcCell<T: Copy + Eq>: SharedCell<T>;
type BoxedSubscription: Subscription;
type BoxedObserver<'a, Item, Err>;
type BoxedObserverMutRef<'a, Item: 'a, Err>;
}
pub struct LocalScope;
pub struct SharedScope;
impl Scope for LocalScope {
type RcMut<T> = MutRc<T>;
type RcCell<T: Copy + Eq> = CellRc<T>;
type BoxedSubscription = BoxedSubscription;
type BoxedObserver<'a, Item, Err> = BoxedObserver<'a, Item, Err>;
type BoxedObserverMutRef<'a, Item: 'a, Err> = BoxedObserverMutRef<'a, Item, Err>;
}
impl Scope for SharedScope {
type RcMut<T> = MutArc<T>;
type RcCell<T: Copy + Eq> = CellArc<T>;
type BoxedSubscription = BoxedSubscriptionSend;
type BoxedObserver<'a, Item, Err> = BoxedObserverSend<'a, Item, Err>;
type BoxedObserverMutRef<'a, Item: 'a, Err> = BoxedObserverMutRefSend<'a, Item, Err>;
}
pub trait Context: Sized {
type Scope: Scope;
type Inner;
type Scheduler: Clone + Default;
type RcMut<T>: From<T> + Clone + RcDerefMut<Target = T>;
type RcCell<T: Copy + Eq>: SharedCell<T>;
type BoxedSubscription: Subscription;
type BoxedObserver<'a, Item, Err>;
type BoxedObserverMutRef<'a, Item: 'a, Err>;
type BoxedCoreObservable<'a, Item, Err>;
type BoxedCoreObservableMutRef<'a, Item: 'a, Err>;
type BoxedCoreObservableClone<'a, Item, Err>;
type BoxedCoreObservableMutRefClone<'a, Item: 'a, Err>;
type With<T>: Context<Inner = T, Scope = Self::Scope, Scheduler = Self::Scheduler>;
fn from_parts(inner: Self::Inner, scheduler: Self::Scheduler) -> Self;
fn new(inner: Self::Inner) -> Self { Self::from_parts(inner, Self::Scheduler::default()) }
fn lift<U>(inner: U) -> Self::With<U>;
fn transform<U, F>(self, f: F) -> Self::With<U>
where
F: FnOnce(Self::Inner) -> U;
fn wrap<U>(&self, inner: U) -> Self::With<U>;
fn swap<U>(self, inner: U) -> (Self::Inner, Self::With<U>);
fn scheduler(&self) -> &Self::Scheduler;
fn inner(&self) -> &Self::Inner;
fn inner_mut(&mut self) -> &mut Self::Inner;
fn into_inner(self) -> Self::Inner;
fn into_parts(self) -> (Self::Inner, Self::Scheduler);
}
#[derive(Clone)]
pub struct LocalCtx<T, S> {
pub inner: T,
pub scheduler: S,
}
#[derive(Clone)]
pub struct SharedCtx<T, S> {
pub inner: T,
pub scheduler: S,
}
macro_rules! impl_context_for_container {
(
$Struct:ident,
$ScopeType:ty,
$RcMutType:ident,
$RcCellType:ident,
$BoxedObserver:ident,
$BoxedObserverMutRef:ident,
$BoxedSubscription:ty,
$BoxedCoreObservable:ty,
$BoxedCoreObservableMutRef:ty,
$BoxedCoreObservableClone:ty,
$BoxedCoreObservableMutRefClone:ty
) => {
impl<T, S> Context for $Struct<T, S>
where
S: Clone + Default,
{
type Scope = $ScopeType;
type Inner = T;
type Scheduler = S;
type RcMut<U> = $RcMutType<U>;
type RcCell<U: Copy + Eq> = $RcCellType<U>;
type With<U> = $Struct<U, S>;
type BoxedObserver<'a, Item, Err> = $BoxedObserver<'a, Item, Err>;
type BoxedObserverMutRef<'a, Item: 'a, Err> = $BoxedObserverMutRef<'a, Item, Err>;
type BoxedSubscription = $BoxedSubscription;
type BoxedCoreObservable<'a, Item, Err> = $BoxedCoreObservable;
type BoxedCoreObservableMutRef<'a, Item: 'a, Err> = $BoxedCoreObservableMutRef;
type BoxedCoreObservableClone<'a, Item, Err> = $BoxedCoreObservableClone;
type BoxedCoreObservableMutRefClone<'a, Item: 'a, Err> = $BoxedCoreObservableMutRefClone;
fn from_parts(inner: T, scheduler: S) -> $Struct<T, S> { $Struct { inner, scheduler } }
fn lift<U>(inner: U) -> $Struct<U, S> { $Struct { inner, scheduler: S::default() } }
fn scheduler(&self) -> &S { &self.scheduler }
fn inner(&self) -> &T { &self.inner }
fn inner_mut(&mut self) -> &mut T { &mut self.inner }
fn into_inner(self) -> Self::Inner { self.inner }
fn into_parts(self) -> (Self::Inner, Self::Scheduler) { (self.inner, self.scheduler) }
fn transform<U, F>(self, f: F) -> $Struct<U, S>
where
F: FnOnce(T) -> U,
{
$Struct { inner: f(self.inner), scheduler: self.scheduler }
}
fn wrap<U>(&self, inner: U) -> $Struct<U, S> {
$Struct { inner, scheduler: self.scheduler.clone() }
}
fn swap<U>(self, new_inner: U) -> (T, $Struct<U, S>) {
(self.inner, $Struct { inner: new_inner, scheduler: self.scheduler })
}
}
};
}
impl_context_for_container!(
LocalCtx,
LocalScope,
MutRc,
CellRc,
BoxedObserver,
BoxedObserverMutRef,
BoxedSubscription,
crate::observable::boxed::BoxedCoreObservable<'a, Item, Err, S>,
crate::observable::boxed::BoxedCoreObservableMutRef<'a, Item, Err, S>,
crate::observable::boxed::BoxedCoreObservableClone<'a, Item, Err, S>,
crate::observable::boxed::BoxedCoreObservableMutRefClone<'a, Item, Err, S>
);
impl_context_for_container!(
SharedCtx,
SharedScope,
MutArc,
CellArc,
BoxedObserverSend,
BoxedObserverMutRefSend,
BoxedSubscriptionSend,
crate::observable::boxed::BoxedCoreObservableSend<'a, Item, Err, S>,
crate::observable::boxed::BoxedCoreObservableMutRefSend<'a, Item, Err, S>,
crate::observable::boxed::BoxedCoreObservableSendClone<'a, Item, Err, S>,
crate::observable::boxed::BoxedCoreObservableMutRefSendClone<'a, Item, Err, S>
);
#[cfg(feature = "scheduler")]
pub type Local<T> = LocalCtx<T, LocalScheduler>;
#[cfg(feature = "scheduler")]
pub type Shared<T> = SharedCtx<T, SharedScheduler>;
#[cfg(test)]
pub type TestCtx<T> = LocalCtx<T, crate::scheduler::test_scheduler::TestScheduler>;
macro_rules! impl_observer_and_subscription {
($Struct:ident $(+ $Bound:ident)?) => {
impl<
Item,
Err,
S,
Inner: Observer<Item, Err> $(+ $Bound)?
> Observer<Item, Err> for $Struct<Inner, S>
where
S: Clone + Default,
{
fn next(&mut self, value: Item) { self.inner.next(value); }
fn error(self, err: Err) { self.inner.error(err); }
fn complete(self) { self.inner.complete(); }
fn is_closed(&self) -> bool { self.inner.is_closed() }
}
impl<S, Inner: Subscription $(+ $Bound)? > Subscription for $Struct<Inner, S>
where
S: Clone + Default,
{
fn unsubscribe(self) { self.inner.unsubscribe(); }
fn is_closed(&self) -> bool { self.inner.is_closed() }
}
};
}
impl_observer_and_subscription!(LocalCtx);
impl_observer_and_subscription!(SharedCtx + Send);
#[cfg(feature = "scheduler")]
impl<T, S> LocalCtx<T, S>
where
T: Send + 'static, S: Clone + Default,
{
pub fn into_shared(self) -> Shared<T> { Shared::new(self.into_inner()) }
}
#[cfg(feature = "scheduler")]
impl<T, S> SharedCtx<T, S>
where
S: Clone + Default,
{
pub fn into_local(self) -> Local<T> { Local::new(self.into_inner()) }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
factory::ObservableFactory,
observable::Observable,
scheduler::{Duration, Task, TaskHandle},
};
#[derive(Clone, Copy, Default)]
struct CustomTestScheduler;
impl<S> crate::scheduler::Schedulable<CustomTestScheduler> for Task<S> {
type Future = std::future::Ready<()>;
fn into_future(self, _scheduler: &CustomTestScheduler) -> Self::Future {
std::future::ready(())
}
}
impl<S> crate::scheduler::Scheduler<Task<S>> for CustomTestScheduler {
fn schedule(&self, mut task: Task<S>, _delay: Option<Duration>) -> TaskHandle {
task.step();
TaskHandle::finished()
}
}
#[derive(Clone)]
struct CustomContext<T> {
inner: T,
scheduler: CustomTestScheduler,
}
impl<T> Context for CustomContext<T> {
type Scope = LocalScope;
type Inner = T;
type Scheduler = CustomTestScheduler;
type RcMut<U> = MutRc<U>;
type RcCell<U: Copy + Eq> = CellRc<U>;
type With<U> = CustomContext<U>;
type BoxedObserver<'a, Item, Err> = BoxedObserver<'a, Item, Err>;
type BoxedObserverMutRef<'a, Item: 'a, Err> = BoxedObserverMutRef<'a, Item, Err>;
type BoxedSubscription = BoxedSubscription;
type BoxedCoreObservable<'a, Item, Err> =
crate::observable::boxed::BoxedCoreObservable<'a, Item, Err, CustomTestScheduler>;
type BoxedCoreObservableMutRef<'a, Item: 'a, Err> =
crate::observable::boxed::BoxedCoreObservableMutRef<'a, Item, Err, CustomTestScheduler>;
type BoxedCoreObservableClone<'a, Item, Err> =
crate::observable::boxed::BoxedCoreObservableClone<'a, Item, Err, CustomTestScheduler>;
type BoxedCoreObservableMutRefClone<'a, Item: 'a, Err> =
crate::observable::boxed::BoxedCoreObservableMutRefClone<'a, Item, Err, CustomTestScheduler>;
fn from_parts(inner: T, scheduler: CustomTestScheduler) -> CustomContext<T> {
CustomContext { inner, scheduler }
}
fn lift<U>(inner: U) -> CustomContext<U> {
CustomContext { inner, scheduler: CustomTestScheduler }
}
fn scheduler(&self) -> &CustomTestScheduler { &self.scheduler }
fn inner(&self) -> &T { &self.inner }
fn inner_mut(&mut self) -> &mut T { &mut self.inner }
fn transform<U, F>(self, f: F) -> CustomContext<U>
where
F: FnOnce(T) -> U,
{
CustomContext { inner: f(self.inner), scheduler: self.scheduler }
}
fn wrap<U>(&self, inner: U) -> CustomContext<U> {
CustomContext { inner, scheduler: self.scheduler }
}
fn swap<U>(self, new_inner: U) -> (T, CustomContext<U>) {
(self.inner, CustomContext { inner: new_inner, scheduler: self.scheduler })
}
fn into_inner(self) -> Self::Inner { self.inner }
fn into_parts(self) -> (Self::Inner, Self::Scheduler) { (self.inner, self.scheduler) }
}
#[rxrust_macro::test]
fn test_factory_blanket_impl_with_defaults() {
let _local_of = Local::of(1);
let _shared_of = Shared::of(2);
}
#[rxrust_macro::test]
fn test_factory_blanket_impl_with_custom_scheduler() {
let _custom_local_of = CustomContext::of("hello");
}
#[rxrust_macro::test]
fn test_custom_scheduler_type_alias() {
#[derive(Clone, Copy, Default)]
struct MyScheduler;
impl<S> crate::scheduler::Schedulable<MyScheduler> for Task<S> {
type Future = std::future::Ready<()>;
fn into_future(self, _scheduler: &MyScheduler) -> Self::Future { std::future::ready(()) }
}
impl<S> crate::scheduler::Scheduler<Task<S>> for MyScheduler {
fn schedule(
&self, mut task: Task<S>, _delay: Option<Duration>,
) -> crate::scheduler::TaskHandle {
task.step();
crate::scheduler::TaskHandle::finished()
}
}
type MyLocal<T> = LocalCtx<T, MyScheduler>;
let executed = std::rc::Rc::new(std::cell::RefCell::new(false));
let exec_clone = executed.clone();
MyLocal::of(123)
.delay(Duration::from_secs(1))
.subscribe(move |v| {
assert_eq!(v, 123);
*exec_clone.borrow_mut() = true;
});
assert!(*executed.borrow());
}
}
#[cfg(test)]
mod context_conversion_tests {
use super::*;
#[rxrust_macro::test]
fn test_local_to_shared_conversion() {
let local_ctx = Local::new(100);
assert_eq!(*local_ctx.inner(), 100);
let shared_ctx = local_ctx.into_shared();
assert_eq!(*shared_ctx.inner(), 100);
let _: Shared<i32> = shared_ctx;
}
#[rxrust_macro::test]
fn test_shared_to_local_conversion() {
let shared_ctx = Shared::new(200);
assert_eq!(*shared_ctx.inner(), 200);
let local_ctx = shared_ctx.into_local();
assert_eq!(*local_ctx.inner(), 200);
let _: Local<i32> = local_ctx;
}
#[rxrust_macro::test]
fn test_round_trip_conversion() {
let original_local = Local::new(300);
assert_eq!(*original_local.inner(), 300);
let shared_from_local = original_local.into_shared();
assert_eq!(*shared_from_local.inner(), 300);
let local_from_shared = shared_from_local.into_local();
assert_eq!(*local_from_shared.inner(), 300);
let _: Local<i32> = local_from_shared;
}
}