use std::sync::{Arc, Mutex};
use crate::{
subjects::{SubjectEmitter, SubjectReceiver},
subscribe::{Fuse, Subscription, SubscriptionHandle, UnsubscribeLogic},
ObservableExt, Subject, Subscribeable, Unsubscribeable,
};
#[derive(Clone)]
pub struct Connectable<T> {
source: Arc<Mutex<dyn ObservableExt<T> + Send + Sync>>,
state_subject: (SubjectEmitter<T>, SubjectReceiver<T>),
fused: bool,
defused: bool,
subject: bool,
connected: Arc<Mutex<bool>>,
connected_subscription: Arc<Mutex<Option<Subscription>>>,
}
impl<T: Clone + 'static> Connectable<T> {
pub fn new(source: Arc<Mutex<dyn ObservableExt<T> + Send + Sync>>) -> Self {
Connectable {
source,
state_subject: Subject::emitter_receiver(),
fused: false,
defused: false,
subject: false,
connected: Arc::new(Mutex::new(false)),
connected_subscription: Arc::new(Mutex::new(None)),
}
}
#[must_use]
pub fn connect(self) -> Subscription {
*self.connected.lock().unwrap() = true;
let mut subscription = self
.source
.lock()
.unwrap()
.subscribe(self.state_subject.0.into());
let subscription_future = subscription.subscription_future;
subscription.subscription_future = SubscriptionHandle::Nil;
*self.connected_subscription.lock().unwrap() = Some(subscription);
let cs = Arc::clone(&self.connected_subscription);
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
let connectable_subscription = cs.lock().unwrap().take();
if let Some(connectable_subscription) = connectable_subscription {
connectable_subscription.unsubscribe();
}
})),
subscription_future,
)
}
}
impl<T> Fuse for Connectable<T> {
fn set_fused(&mut self, fused: bool, defused: bool) {
self.fused = fused;
self.defused = defused;
}
fn get_fused(&self) -> (bool, bool) {
(self.fused, self.defused)
}
}
impl<T: 'static> Subscribeable for Connectable<T> {
type ObsType = T;
fn subscribe(
&mut self,
mut s: crate::subscribe::Subscriber<Self::ObsType>,
) -> crate::subscribe::Subscription {
let (fused, defused) = s.get_fused();
if defused || (fused && !self.fused) {
self.defused = s.defused;
self.fused = s.fused;
} else {
s.set_fused(self.fused, self.defused);
}
let subject_subscription = self.state_subject.1.subscribe(s);
let cs = Arc::clone(&self.connected_subscription);
let connected = Arc::clone(&self.connected);
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
if !*connected.lock().unwrap() {
subject_subscription.unsubscribe();
}
let connectable_subscription = cs.lock().unwrap().take();
if let Some(connectable_subscription) = connectable_subscription {
connectable_subscription.unsubscribe();
}
})),
SubscriptionHandle::Nil,
)
}
fn is_subject(&self) -> bool {
self.subject
}
fn set_subject_indicator(&mut self, s: bool) {
self.subject = s;
}
}