rx_core_common 0.2.2

rx_core's core traits and implementations
Documentation
use crate::{ComposableOperator, Observable, Operator, Subscriber};
use rx_core_macro_operator_derive::RxOperator;

use super::Pipe;

#[derive(RxOperator, Clone, Debug)]
#[_rx_core_common_crate(crate)]
#[rx_in(Op::In)]
#[rx_in_error(Op::InError)]
#[rx_out(Op::Out)]
#[rx_out_error(Op::OutError)]
pub struct ComposeOperator<Op>
where
	Op: 'static + ComposableOperator,
{
	operator: Op,
}

impl<Op> ComposeOperator<Op>
where
	Op: 'static + ComposableOperator,
{
	#[inline]
	pub fn new(operator: Op) -> Self {
		Self { operator }
	}
}

impl<Op> From<Op> for ComposeOperator<Op>
where
	Op: 'static + ComposableOperator,
{
	#[inline]
	fn from(operator: Op) -> Self {
		ComposeOperator::new(operator)
	}
}

impl<Op> ComposableOperator for ComposeOperator<Op>
where
	Op: 'static + ComposableOperator,
{
	type Subscriber<Destination>
		= Op::Subscriber<Destination>
	where
		Destination: 'static + Subscriber<In = Self::Out, InError = Self::OutError> + Send + Sync;

	#[inline]
	fn operator_subscribe<Destination>(
		&mut self,
		destination: Destination,
	) -> Self::Subscriber<Destination>
	where
		Destination: 'static + Subscriber<In = Self::Out, InError = Self::OutError> + Send + Sync,
	{
		self.operator.operator_subscribe(destination)
	}
}

impl<'o, Op> Operator<'o> for Op
where
	Op: 'static + ComposableOperator,
{
	type OutObservable<InObservable>
		= Pipe<InObservable, Op>
	where
		InObservable: 'o + Observable<Out = Self::In, OutError = Self::InError> + Send + Sync;

	#[inline]
	fn operate<InObservable>(self, source: InObservable) -> Self::OutObservable<InObservable>
	where
		InObservable: 'o + Observable<Out = Self::In, OutError = Self::InError> + Send + Sync,
	{
		Pipe::new(source, self)
	}
}