use std::sync::{Arc, Mutex};
use crate::{
Observable, ObservableOutput, ObserverInput, ObserverUpgradesToSelf, PrimaryCategorySubscriber,
RxObserver, SharedDestination, Subscriber, SubscriptionLike, Teardown, TeardownCollection,
WithPrimaryCategory,
};
impl<Destination> WithPrimaryCategory for Arc<Mutex<Destination>>
where
Destination: ?Sized + WithPrimaryCategory,
{
type PrimaryCategory = PrimaryCategorySubscriber;
}
impl<Destination> ObserverUpgradesToSelf for Arc<Mutex<Destination>> where
Destination: ?Sized + ObserverUpgradesToSelf
{
}
impl<Destination> ObserverInput for Arc<Mutex<Destination>>
where
Destination: ?Sized + ObserverInput,
{
type In = Destination::In;
type InError = Destination::InError;
}
impl<Destination> SharedDestination<Destination> for Arc<Mutex<Destination>>
where
Destination: 'static + ?Sized + Subscriber + Send + Sync,
{
fn access<F>(&mut self, accessor: F)
where
F: Fn(&Destination),
{
if let Ok(destination) = self.lock() {
accessor(&destination)
}
}
fn access_mut<F>(&mut self, mut accessor: F)
where
F: FnMut(&mut Destination),
{
if let Ok(mut destination) = self.lock() {
accessor(&mut destination)
}
}
}
impl<Destination> RxObserver for Arc<Mutex<Destination>>
where
Destination: ?Sized + RxObserver + SubscriptionLike,
{
fn next(&mut self, next: Self::In) {
match self.lock() {
Ok(mut lock) => lock.next(next),
Err(poison_error) => poison_error.into_inner().unsubscribe(),
}
}
fn error(&mut self, error: Self::InError) {
match self.lock() {
Ok(mut lock) => lock.error(error),
Err(poison_error) => poison_error.into_inner().unsubscribe(),
}
}
fn complete(&mut self) {
match self.lock() {
Ok(mut lock) => lock.complete(),
Err(poison_error) => poison_error.into_inner().unsubscribe(),
}
}
}
impl<Destination> SubscriptionLike for Arc<Mutex<Destination>>
where
Destination: ?Sized + SubscriptionLike,
{
fn is_closed(&self) -> bool {
self.lock()
.unwrap_or_else(|err| err.into_inner())
.is_closed()
}
fn unsubscribe(&mut self) {
self.lock()
.unwrap_or_else(|err| err.into_inner())
.unsubscribe()
}
}
impl<Destination> TeardownCollection for Arc<Mutex<Destination>>
where
Destination: ?Sized + TeardownCollection + SubscriptionLike,
{
fn add_teardown(&mut self, teardown: Teardown) {
match self.lock() {
Ok(mut lock) => {
lock.add_teardown(teardown);
}
Err(poison_error) => {
teardown.execute();
poison_error.into_inner().unsubscribe();
}
}
}
}
impl<O> ObservableOutput for Arc<Mutex<O>>
where
O: ObservableOutput,
{
type Out = O::Out;
type OutError = O::OutError;
}
impl<O> Observable for Arc<Mutex<O>>
where
O: Observable,
{
type Subscription<Destination>
= O::Subscription<Destination>
where
Destination: 'static + Subscriber<In = Self::Out, InError = Self::OutError>;
fn subscribe<Destination>(
&mut self,
destination: Destination,
) -> Self::Subscription<Destination::Upgraded>
where
Destination: 'static
+ crate::UpgradeableObserver<In = Self::Out, InError = Self::OutError>
+ Send
+ Sync,
{
let destination = destination.upgrade();
match self.lock() {
Ok(mut lock) => lock.subscribe(destination),
Err(poison_error) => {
let mut subscription = poison_error.into_inner().subscribe(destination);
subscription.unsubscribe();
subscription
}
}
}
}