rx_core_common 0.2.2

rx_core's core traits and implementations
Documentation
use std::sync::{Arc, Mutex};

use crate::{
	Observable, ObservableOutput, ObserverInput, ObserverUpgradesToSelf, PrimaryCategorySubscriber,
	RxObserver, SharedDestination, Subscriber, SubscriptionLike, Teardown, TeardownCollection,
	WithPrimaryCategory,
};

impl<Destination> WithPrimaryCategory for Arc<Mutex<Destination>>
where
	Destination: ?Sized + WithPrimaryCategory,
{
	type PrimaryCategory = PrimaryCategorySubscriber;
}

impl<Destination> ObserverUpgradesToSelf for Arc<Mutex<Destination>> where
	Destination: ?Sized + ObserverUpgradesToSelf
{
}

impl<Destination> ObserverInput for Arc<Mutex<Destination>>
where
	Destination: ?Sized + ObserverInput,
{
	type In = Destination::In;
	type InError = Destination::InError;
}

impl<Destination> SharedDestination<Destination> for Arc<Mutex<Destination>>
where
	Destination: 'static + ?Sized + Subscriber + Send + Sync,
{
	fn access<F>(&mut self, accessor: F)
	where
		F: Fn(&Destination),
	{
		if let Ok(destination) = self.lock() {
			accessor(&destination)
		}
	}

	fn access_mut<F>(&mut self, mut accessor: F)
	where
		F: FnMut(&mut Destination),
	{
		if let Ok(mut destination) = self.lock() {
			accessor(&mut destination)
		}
	}
}

impl<Destination> RxObserver for Arc<Mutex<Destination>>
where
	Destination: ?Sized + RxObserver + SubscriptionLike,
{
	fn next(&mut self, next: Self::In) {
		match self.lock() {
			Ok(mut lock) => lock.next(next),
			Err(poison_error) => poison_error.into_inner().unsubscribe(),
		}
	}

	fn error(&mut self, error: Self::InError) {
		match self.lock() {
			Ok(mut lock) => lock.error(error),
			Err(poison_error) => poison_error.into_inner().unsubscribe(),
		}
	}

	fn complete(&mut self) {
		match self.lock() {
			Ok(mut lock) => lock.complete(),
			Err(poison_error) => poison_error.into_inner().unsubscribe(),
		}
	}
}

impl<Destination> SubscriptionLike for Arc<Mutex<Destination>>
where
	Destination: ?Sized + SubscriptionLike,
{
	// Ignore the poison for is_closed checks, so the other signals can still
	// operate and unsubscribe when it's poisoned.
	fn is_closed(&self) -> bool {
		self.lock()
			.unwrap_or_else(|err| err.into_inner())
			.is_closed()
	}

	// Ignore poison on unsubscribe; it only matters if other signals still need
	// it. They already log poison errors and unsubscribe instead, which would
	// otherwise double print.
	fn unsubscribe(&mut self) {
		self.lock()
			.unwrap_or_else(|err| err.into_inner())
			.unsubscribe()
	}
}

impl<Destination> TeardownCollection for Arc<Mutex<Destination>>
where
	Destination: ?Sized + TeardownCollection + SubscriptionLike,
{
	fn add_teardown(&mut self, teardown: Teardown) {
		match self.lock() {
			Ok(mut lock) => {
				lock.add_teardown(teardown);
			}
			Err(poison_error) => {
				teardown.execute();
				poison_error.into_inner().unsubscribe();
			}
		}
	}
}

impl<O> ObservableOutput for Arc<Mutex<O>>
where
	O: ObservableOutput,
{
	type Out = O::Out;
	type OutError = O::OutError;
}

impl<O> Observable for Arc<Mutex<O>>
where
	O: Observable,
{
	type Subscription<Destination>
		= O::Subscription<Destination>
	where
		Destination: 'static + Subscriber<In = Self::Out, InError = Self::OutError>;

	fn subscribe<Destination>(
		&mut self,
		destination: Destination,
	) -> Self::Subscription<Destination::Upgraded>
	where
		Destination: 'static
			+ crate::UpgradeableObserver<In = Self::Out, InError = Self::OutError>
			+ Send
			+ Sync,
	{
		let destination = destination.upgrade();

		match self.lock() {
			Ok(mut lock) => lock.subscribe(destination),
			Err(poison_error) => {
				let mut subscription = poison_error.into_inner().subscribe(destination);
				subscription.unsubscribe();
				subscription
			}
		}
	}
}