rx_core_common 0.2.2

rx_core's core traits and implementations
Documentation
use crate::{ComposableOperator, Observable, Subscriber, UpgradeableObserver};
use rx_core_macro_observable_derive::RxObservable;

#[derive(RxObservable, Clone, Debug)]
#[_rx_core_common_crate(crate)]
#[rx_out(Op::Out)]
#[rx_out_error(Op::OutError)]
pub struct Pipe<Source, Op>
where
	Source: Observable,
	Op: 'static + ComposableOperator<In = Source::Out, InError = Source::OutError>,
{
	source_observable: Source,
	operator: Op,
}

impl<Source, Op> Pipe<Source, Op>
where
	Source: Observable,
	Op: 'static + ComposableOperator<In = Source::Out, InError = Source::OutError>,
{
	pub fn new(source_observable: Source, operator: Op) -> Self {
		Self {
			source_observable,
			operator,
		}
	}
}

impl<Source, Op> Observable for Pipe<Source, Op>
where
	Source: Observable,
	Op: 'static + ComposableOperator<In = Source::Out, InError = Source::OutError>,
{
	type Subscription<Destination>
		= Source::Subscription<
		<Op as ComposableOperator>::Subscriber<<Destination as UpgradeableObserver>::Upgraded>,
	>
	where
		Destination: 'static + Subscriber<In = Self::Out, InError = Self::OutError>;

	#[inline]
	fn subscribe<Destination>(
		&mut self,
		observer: Destination,
	) -> Self::Subscription<Destination::Upgraded>
	where
		Destination:
			'static + UpgradeableObserver<In = Self::Out, InError = Self::OutError> + Send + Sync,
	{
		let destination = observer.upgrade();
		let operator_subscriber = self.operator.operator_subscribe(destination);
		self.source_observable.subscribe(operator_subscriber)
	}
}