use std::{
collections::VecDeque,
error::Error,
sync::{Arc, Mutex},
time::Instant,
};
use crate::{
observer::Observer,
subscribe::Unsubscribeable,
subscription::subscribe::{
Subscribeable, Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic,
},
Observable,
};
struct EmittedValueEntry<T>(T, Instant);
impl<T> EmittedValueEntry<T> {
pub fn new(v: T) -> Self {
EmittedValueEntry(v, Instant::now())
}
pub fn is_fresh(&self, window_size_ms: u128) -> bool {
self.1.elapsed().as_millis() <= window_size_ms
}
}
pub enum BufSize {
Unbounded,
Bounded(usize),
}
pub struct ReplaySubject<T> {
buf_size: BufSize,
window_size: Option<u128>,
values: VecDeque<EmittedValueEntry<T>>,
observers: Vec<(u64, Subscriber<T>)>,
completed: bool,
closed: bool,
error: Option<Arc<dyn Error + Send + Sync>>,
}
impl<T: Send + Sync + 'static> ReplaySubject<T> {
#[must_use]
pub fn emitter_receiver(
buf_size: BufSize,
) -> (ReplaySubjectEmitter<T>, ReplaySubjectReceiver<T>) {
let mut s = ReplaySubject {
buf_size,
window_size: None,
values: VecDeque::new(),
observers: Vec::with_capacity(16),
completed: false,
closed: false,
error: None,
};
match s.buf_size {
BufSize::Unbounded => s.values = VecDeque::with_capacity(16),
BufSize::Bounded(size) => s.values = VecDeque::with_capacity(size),
}
let s = Arc::new(Mutex::new(s));
(
ReplaySubjectEmitter(Arc::clone(&s)),
ReplaySubjectReceiver(Arc::clone(&s)),
)
}
#[must_use]
pub fn emitter_receiver_time_aware(
buf_size: BufSize,
window_size_ms: u128,
) -> (ReplaySubjectEmitter<T>, ReplaySubjectReceiver<T>) {
let mut s = ReplaySubject {
buf_size,
window_size: Some(window_size_ms),
values: VecDeque::new(),
observers: Vec::with_capacity(16),
completed: false,
closed: false,
error: None,
};
match s.buf_size {
BufSize::Unbounded => s.values = VecDeque::with_capacity(16),
BufSize::Bounded(size) => s.values = VecDeque::with_capacity(size),
}
let s = Arc::new(Mutex::new(s));
(
ReplaySubjectEmitter(Arc::clone(&s)),
ReplaySubjectReceiver(Arc::clone(&s)),
)
}
}
#[allow(clippy::module_name_repetitions)]
#[derive(Clone)]
pub struct ReplaySubjectReceiver<T>(Arc<Mutex<ReplaySubject<T>>>);
#[allow(clippy::module_name_repetitions)]
#[derive(Clone)]
pub struct ReplaySubjectEmitter<T>(Arc<Mutex<ReplaySubject<T>>>);
impl<T> ReplaySubjectReceiver<T> {
#[must_use]
pub fn len(&self) -> usize {
self.0.lock().unwrap().observers.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<T> crate::subscription::subscribe::Fuse for ReplaySubjectReceiver<T> {}
impl<T: Clone + Send + Sync + 'static> Subscribeable for ReplaySubjectReceiver<T> {
type ObsType = T;
fn subscribe(&mut self, mut v: Subscriber<Self::ObsType>) -> Subscription {
let key: u64 = super::gen_key().next().unwrap_or(super::random_seed());
if let Ok(mut src) = self.0.lock() {
if src.closed {
return Subscription::subject_subscription(
UnsubscribeLogic::Nil,
SubscriptionHandle::Nil,
);
}
if let Some(window_size_ms) = src.window_size {
src.values.retain(|e| e.is_fresh(window_size_ms));
}
for value in &src.values {
v.next(value.0.clone());
}
if src.completed {
if let Some(err) = &src.error {
v.error(Arc::clone(err));
} else {
v.complete();
}
return Subscription::subject_subscription(
UnsubscribeLogic::Nil,
SubscriptionHandle::Nil,
);
}
src.observers.push((key, v));
} else {
return Subscription::subject_subscription(
UnsubscribeLogic::Nil,
SubscriptionHandle::Nil,
);
};
let source_cloned = Arc::clone(&self.0);
Subscription::subject_subscription(
UnsubscribeLogic::Logic(Box::new(move || {
source_cloned
.lock()
.unwrap()
.observers
.retain(move |v| v.0 != key);
})),
SubscriptionHandle::Nil,
)
}
}
impl<T> Unsubscribeable for ReplaySubjectReceiver<T> {
fn unsubscribe(self) {
if let Ok(mut r) = self.0.lock() {
r.closed = true;
r.observers.clear();
}
}
}
impl<T: Clone> Observer for ReplaySubjectEmitter<T> {
type NextFnType = T;
fn next(&mut self, v: Self::NextFnType) {
if let Ok(mut src) = self.0.lock() {
if src.completed || src.closed {
return;
}
match src.buf_size {
BufSize::Unbounded => src.values.push_back(EmittedValueEntry::new(v.clone())),
BufSize::Bounded(buf_size) => {
if src.values.len() == buf_size {
src.values.pop_front();
}
if buf_size > 0 {
src.values.push_back(EmittedValueEntry::new(v.clone()));
}
}
};
} else {
return;
}
for (_, o) in &mut self.0.lock().unwrap().observers {
o.next(v.clone());
}
}
fn error(&mut self, e: Arc<dyn Error + Send + Sync>) {
if let Ok(mut src) = self.0.lock() {
if src.completed || src.closed {
return;
}
for (_, o) in &mut src.observers {
o.error(e.clone());
}
src.completed = true;
src.error = Some(e);
src.observers.clear();
}
}
fn complete(&mut self) {
if let Ok(mut src) = self.0.lock() {
if src.completed || src.closed {
return;
}
for (_, o) in &mut src.observers {
o.complete();
}
src.completed = true;
src.observers.clear();
}
}
}
impl<T: Clone + Send + 'static> From<ReplaySubjectEmitter<T>> for Subscriber<T> {
fn from(mut value: ReplaySubjectEmitter<T>) -> Self {
let mut vn = value.clone();
let mut ve = value.clone();
Subscriber::new(
move |v| {
vn.next(v);
},
move |e| ve.error(e),
move || value.complete(),
)
}
}
impl<T: Clone + Send + Sync + 'static> From<ReplaySubjectReceiver<T>> for Observable<T> {
fn from(mut value: ReplaySubjectReceiver<T>) -> Self {
Observable::new(move |subscriber| value.subscribe(subscriber))
}
}