rx_core_common 0.2.2

rx_core's core traits and implementations
Documentation
use std::{
	marker::PhantomData,
	sync::{Arc, Mutex},
};

use crate::{
	ErasedSubscriber, Observable, PhantomInvariant, Signal, Subscriber, SubscriptionData,
	UpgradeableObserver,
};
use rx_core_macro_observable_derive::RxObservable;

#[derive(RxObservable)]
#[_rx_core_common_crate(crate)]
#[rx_out(Out)]
#[rx_out_error(OutError)]
pub struct ErasedObservable<Out, OutError>
where
	Out: Signal,
	OutError: Signal,
{
	subscribe:
		Arc<Mutex<dyn FnMut(ErasedSubscriber<Out, OutError>) -> SubscriptionData + Send + Sync>>,
	_phantom_data: PhantomInvariant<(Out, OutError)>,
}

impl<Out, OutError> ErasedObservable<Out, OutError>
where
	Out: 'static + Signal,
	OutError: 'static + Signal,
{
	pub fn new<O>(mut observable: O) -> Self
	where
		O: 'static + Observable<Out = Out, OutError = OutError> + Send + Sync,
	{
		ErasedObservable {
			subscribe: Arc::new(Mutex::new(move |destination| {
				let subscription = observable.subscribe(destination);
				SubscriptionData::new_with_teardown(subscription.into())
			})),
			_phantom_data: PhantomData,
		}
	}
}

impl<Out, OutError> Clone for ErasedObservable<Out, OutError>
where
	Out: 'static + Signal,
	OutError: 'static + Signal,
{
	fn clone(&self) -> Self {
		Self {
			subscribe: self.subscribe.clone(),
			_phantom_data: PhantomData,
		}
	}
}

impl<Out, OutError> Observable for ErasedObservable<Out, OutError>
where
	Out: Signal,
	OutError: Signal,
{
	type Subscription<Destination>
		= SubscriptionData
	where
		Destination: 'static + Subscriber<In = Self::Out, InError = Self::OutError>;

	fn subscribe<Destination>(
		&mut self,
		destination: Destination,
	) -> Self::Subscription<Destination::Upgraded>
	where
		Destination:
			'static + UpgradeableObserver<In = Self::Out, InError = Self::OutError> + Send + Sync,
	{
		let mut subscribe = self
			.subscribe
			.lock()
			.unwrap_or_else(|poison_error| poison_error.into_inner());

		(subscribe)(ErasedSubscriber::new(destination.upgrade()))
	}
}