use std::convert::Infallible;
use crate::context::RcDerefMut;
pub trait Observer<Item, Err> {
fn next(&mut self, value: Item);
fn error(self, err: Err);
fn complete(self);
fn is_closed(&self) -> bool;
}
pub trait Emitter<Item, Err> {
fn next(&mut self, value: Item);
fn error(&mut self, err: Err);
fn complete(&mut self);
}
pub trait DynObserver<Item, Err> {
fn box_next(&mut self, value: Item);
fn box_error(self: Box<Self>, err: Err);
fn box_complete(self: Box<Self>);
fn box_is_closed(&self) -> bool;
}
impl<T, Item, Err> DynObserver<Item, Err> for T
where
T: Observer<Item, Err>,
{
fn box_next(&mut self, value: Item) { self.next(value); }
fn box_error(self: Box<Self>, err: Err) { self.error(err); }
fn box_complete(self: Box<Self>) { self.complete(); }
fn box_is_closed(&self) -> bool { self.is_closed() }
}
macro_rules! impl_observer_for_box {
(@impl $ty:ty, generics = [$($generics:tt)*], item_type = $item:ty) => {
#[allow(coherence_leak_check)]
impl<$($generics)*> Observer<$item, Err> for $ty {
#[inline]
fn next(&mut self, value: $item) {
(**self).box_next(value)
}
#[inline]
fn error(self, err: Err) {
self.box_error(err)
}
#[inline]
fn complete(self) {
self.box_complete()
}
#[inline]
fn is_closed(&self) -> bool {
(**self).box_is_closed()
}
}
};
(regular: $ty:ty, lifetime = $lf:lifetime) => {
impl_observer_for_box!(@impl $ty, generics = [$lf, Item, Err], item_type = Item);
};
(mut_ref: $ty:ty, lifetime = $lf:lifetime) => {
impl_observer_for_box!(@impl $ty, generics = [$lf, T, Err], item_type = &mut T);
};
}
impl_observer_for_box!(regular: Box<dyn DynObserver<Item, Err> + 'a>, lifetime = 'a);
impl_observer_for_box!(regular: Box<dyn DynObserver<Item, Err> + Send + 'a>, lifetime = 'a);
impl_observer_for_box!(mut_ref: Box<dyn for<'m> DynObserver<&'m mut T, Err> + 'a>, lifetime = 'a);
impl_observer_for_box!(
mut_ref: Box<dyn for<'m> DynObserver<&'m mut T, Err> + Send + 'a>,
lifetime = 'a
);
pub trait IntoBoxedObserver<O> {
fn into_boxed(self) -> O;
}
impl<'a, Item, Err, O> IntoBoxedObserver<Box<dyn DynObserver<Item, Err> + 'a>> for O
where
O: Observer<Item, Err> + 'a,
{
fn into_boxed(self) -> Box<dyn DynObserver<Item, Err> + 'a> { Box::new(self) }
}
impl<'a, Item, Err, O> IntoBoxedObserver<Box<dyn DynObserver<Item, Err> + Send + 'a>> for O
where
O: Observer<Item, Err> + Send + 'a,
{
fn into_boxed(self) -> Box<dyn DynObserver<Item, Err> + Send + 'a> { Box::new(self) }
}
#[allow(coherence_leak_check)]
impl<'a, T, Err, O> IntoBoxedObserver<Box<dyn for<'m> DynObserver<&'m mut T, Err> + 'a>> for O
where
O: for<'m> Observer<&'m mut T, Err> + 'a,
{
fn into_boxed(self) -> Box<dyn for<'m> DynObserver<&'m mut T, Err> + 'a> { Box::new(self) }
}
#[allow(coherence_leak_check)]
impl<'a, T, Err, O> IntoBoxedObserver<Box<dyn for<'m> DynObserver<&'m mut T, Err> + Send + 'a>>
for O
where
O: for<'m> Observer<&'m mut T, Err> + Send + 'a,
{
fn into_boxed(self) -> Box<dyn for<'m> DynObserver<&'m mut T, Err> + Send + 'a> { Box::new(self) }
}
pub type BoxedObserver<'a, Item, Err> = Box<dyn DynObserver<Item, Err> + 'a>;
pub type BoxedObserverSend<'a, Item, Err> = Box<dyn DynObserver<Item, Err> + Send + 'a>;
pub type BoxedObserverMutRef<'a, Item, Err> = Box<dyn for<'m> DynObserver<&'m mut Item, Err> + 'a>;
pub type BoxedObserverMutRefSend<'a, Item, Err> =
Box<dyn for<'m> DynObserver<&'m mut Item, Err> + Send + 'a>;
#[derive(Clone)]
pub struct FnMutObserver<F>(pub F);
impl<F, Item> Observer<Item, Infallible> for FnMutObserver<F>
where
F: FnMut(Item),
{
#[inline]
fn next(&mut self, v: Item) { (self.0)(v); }
#[inline]
fn error(self, _err: Infallible) {
}
#[inline]
fn complete(self) {
}
#[inline]
fn is_closed(&self) -> bool { false }
}
impl<O, Item, Err> Observer<Item, Err> for Option<O>
where
O: Observer<Item, Err>,
{
fn next(&mut self, value: Item) {
if let Some(inner) = self {
inner.next(value);
}
}
fn error(self, err: Err) {
if let Some(inner) = self {
inner.error(err);
}
}
fn complete(self) {
if let Some(inner) = self {
inner.complete();
}
}
fn is_closed(&self) -> bool { self.as_ref().is_none_or(Observer::is_closed) }
}
impl<O, Item, Err, P> Observer<Item, Err> for P
where
P: RcDerefMut<Target = Option<O>>,
O: Observer<Item, Err>,
{
fn next(&mut self, value: Item) { self.rc_deref_mut().next(value); }
fn error(self, err: Err) {
if let Some(inner) = self.rc_deref_mut().take() {
inner.error(err);
}
}
fn complete(self) {
if let Some(inner) = self.rc_deref_mut().take() {
inner.complete();
}
}
fn is_closed(&self) -> bool { self.rc_deref().is_none() }
}
#[cfg(test)]
mod tests {
use super::*;
struct TestObserver {
values: Vec<i32>,
}
impl Observer<i32, ()> for TestObserver {
fn next(&mut self, value: i32) { self.values.push(value); }
fn error(self, _: ()) {}
fn complete(self) {}
fn is_closed(&self) -> bool { false }
}
#[rxrust_macro::test]
fn test_observer_trait() {
let mut obs = TestObserver { values: vec![] };
obs.next(1);
obs.next(2);
assert_eq!(obs.values, vec![1, 2]);
assert!(!obs.is_closed());
}
#[rxrust_macro::test]
fn test_closure_as_observer() {
let mut count = 0;
let mut closure_obs = FnMutObserver(|v: i32| {
count += v;
});
closure_obs.next(10);
closure_obs.next(20);
assert_eq!(count, 30);
}
}