use std::{
error::Error,
sync::{Arc, Mutex},
};
use crate::{
observer::Observer,
subscribe::Unsubscribeable,
subscription::subscribe::{
Subscribeable, Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic,
},
Observable,
};
pub struct Subject<T> {
observers: Vec<(u64, Subscriber<T>)>,
completed: bool,
closed: bool,
error: Option<Arc<dyn Error + Send + Sync>>,
}
impl<T: 'static> Subject<T> {
#[must_use]
pub fn emitter_receiver() -> (SubjectEmitter<T>, SubjectReceiver<T>) {
let s = Arc::new(Mutex::new(Subject {
observers: Vec::with_capacity(16),
completed: false,
closed: false,
error: None,
}));
(
SubjectEmitter(Arc::clone(&s)),
SubjectReceiver(Arc::clone(&s)),
)
}
}
#[allow(clippy::module_name_repetitions)]
#[derive(Clone)]
pub struct SubjectReceiver<T>(Arc<Mutex<Subject<T>>>);
#[allow(clippy::module_name_repetitions)]
#[derive(Clone)]
pub struct SubjectEmitter<T>(Arc<Mutex<Subject<T>>>);
impl<T> SubjectReceiver<T> {
#[must_use]
pub fn len(&self) -> usize {
self.0.lock().unwrap().observers.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<T> crate::subscription::subscribe::Fuse for SubjectReceiver<T> {}
impl<T: 'static> Subscribeable for SubjectReceiver<T> {
type ObsType = T;
fn subscribe(&mut self, mut v: Subscriber<Self::ObsType>) -> Subscription {
let key: u64 = super::gen_key().next().unwrap_or(super::random_seed());
if let Ok(mut src) = self.0.lock() {
if src.closed {
return Subscription::subject_subscription(
UnsubscribeLogic::Nil,
SubscriptionHandle::Nil,
);
}
if src.completed {
if let Some(err) = &src.error {
v.error(Arc::clone(err));
} else {
v.complete();
}
return Subscription::subject_subscription(
UnsubscribeLogic::Nil,
SubscriptionHandle::Nil,
);
}
src.observers.push((key, v));
} else {
return Subscription::subject_subscription(
UnsubscribeLogic::Nil,
SubscriptionHandle::Nil,
);
};
let source_cloned = Arc::clone(&self.0);
Subscription::subject_subscription(
UnsubscribeLogic::Logic(Box::new(move || {
source_cloned
.lock()
.unwrap()
.observers
.retain(move |v| v.0 != key);
})),
SubscriptionHandle::Nil,
)
}
}
impl<T> Unsubscribeable for SubjectReceiver<T> {
fn unsubscribe(self) {
if let Ok(mut r) = self.0.lock() {
r.closed = true;
r.observers.clear();
}
}
}
impl<T: Clone> Observer for SubjectEmitter<T> {
type NextFnType = T;
fn next(&mut self, v: Self::NextFnType) {
if let Ok(src) = self.0.lock() {
if src.completed || src.closed {
return;
}
}
for (_, o) in &mut self.0.lock().unwrap().observers {
o.next(v.clone());
}
}
fn error(&mut self, e: Arc<dyn Error + Send + Sync>) {
if let Ok(mut src) = self.0.lock() {
if src.completed || src.closed {
return;
}
for (_, o) in &mut src.observers {
o.error(e.clone());
}
src.completed = true;
src.error = Some(e);
src.observers.clear();
}
}
fn complete(&mut self) {
if let Ok(mut src) = self.0.lock() {
if src.completed || src.closed {
return;
}
for (_, o) in &mut src.observers {
o.complete();
}
src.completed = true;
src.observers.clear();
}
}
}
impl<T: Clone + 'static> From<SubjectEmitter<T>> for Subscriber<T> {
fn from(mut value: SubjectEmitter<T>) -> Self {
let mut vn = value.clone();
let mut ve = value.clone();
Subscriber::new(
move |v| {
vn.next(v);
},
move |e| ve.error(e),
move || value.complete(),
)
}
}
impl<T: Clone + Send + Sync + 'static> From<SubjectReceiver<T>> for Observable<T> {
fn from(mut value: SubjectReceiver<T>) -> Self {
Observable::new(move |subscriber| value.subscribe(subscriber))
}
}