use crate::prelude::*;
use std::sync::{Arc, RwLock};
#[derive(Clone)]
pub struct Replay<'a, Item>
where
Item: Clone + Send + Sync,
{
subject: subjects::ReplaySubject<'a, Item>,
source: Observable<'a, Item>,
subscription: Arc<RwLock<Option<Subscription<'a>>>>,
}
impl<'a, Item> Replay<'a, Item>
where
Item: Clone + Send + Sync,
{
pub fn new(source: Observable<'a, Item>) -> Replay<'a, Item> {
let _self = Replay {
subject: subjects::ReplaySubject::new(),
source,
subscription: Arc::new(RwLock::new(None)),
};
_self.set_ref_count();
_self
}
pub fn observable(&self) -> Observable<'a, Item> {
self.subject.observable()
}
fn set_ref_count(&self) {
{
let subscription = Arc::clone(&self.subscription);
self.subject.set_on_unsubscribe(move |count| {
if count == 0 {
if let Some(sbsc) = &*subscription.read().unwrap() {
sbsc.unsubscribe();
}
}
});
}
let source = self.source.clone();
let subject = self.subject.clone();
let subscription = Arc::clone(&self.subscription);
self.subject.set_on_subscribe(move |count| {
if count == 1 {
let sbj_next = subject.clone();
let sbj_error = subject.clone();
let sbj_complete = subject.clone();
let mut subscription = subscription.write().unwrap();
if subscription.is_some() {
return;
}
*subscription = Some(source.subscribe(
move |x| {
sbj_next.next(x);
},
move |e| {
sbj_error.error(e);
},
move || {
sbj_complete.complete();
},
));
}
});
}
}
impl<'a, Item> Observable<'a, Item>
where
Item: Clone + Send + Sync,
{
pub fn replay(&self) -> Replay<'a, Item> {
Replay::new(self.clone())
}
}
#[cfg(all(test, not(feature = "web")))]
mod test {
use crate::prelude::*;
use crate::{print_complete, print_error, print_next_fmt};
use schedulers::new_thread_scheduler;
use std::{thread, time};
#[test]
fn basic() {
let o = observables::from_iter(0..10)
.tap(
print_next_fmt!("tap {}"),
print_error!(),
print_complete!(),
)
.replay();
let obs = o.observable();
println!("start #1");
let sbsc1 = obs.subscribe(
print_next_fmt!("#1 {}"),
print_error!(),
print_complete!(),
);
println!("start #2");
let sbsc2 = obs.subscribe(
print_next_fmt!("#2 {}"),
print_error!(),
print_complete!(),
);
println!("end #1");
sbsc1.unsubscribe();
println!("end #2");
sbsc2.unsubscribe();
}
#[test]
fn thread() {
let o = observables::interval(
time::Duration::from_millis(100),
new_thread_scheduler(),
)
.tap(
print_next_fmt!("tap {}"),
print_error!(),
print_complete!(),
)
.replay();
let obs = o.observable();
println!("start #1");
let sbsc1 = obs.subscribe(
print_next_fmt!("#1 {}"),
print_error!(),
print_complete!(),
);
thread::sleep(time::Duration::from_millis(500));
println!("start #2");
let sbsc2 = obs.subscribe(
print_next_fmt!("#2 {}"),
print_error!(),
print_complete!(),
);
thread::sleep(time::Duration::from_millis(500));
println!("end #1");
sbsc1.unsubscribe();
thread::sleep(time::Duration::from_millis(500));
println!("end #2");
sbsc2.unsubscribe();
println!("final wait start");
thread::sleep(time::Duration::from_millis(500));
println!("final wait end");
}
}