use crate::prelude::*;
use crate::scheduler::Scheduler;
pub trait SubscribeOn {
fn subscribe_on<SD>(self, scheduler: SD) -> SubscribeOnOP<Self, SD>
where
Self: Sized,
{
SubscribeOnOP {
source: self,
scheduler,
}
}
}
pub struct SubscribeOnOP<S, SD> {
source: S,
scheduler: SD,
}
impl<S, SD> IntoShared for SubscribeOnOP<S, SD>
where
Self: Send + Sync + 'static,
{
type Shared = Self;
#[inline(always)]
fn to_shared(self) -> Self::Shared { self }
}
impl<T> SubscribeOn for T {}
impl<Item, Err, O, S, SD> RawSubscribable<Item, Err, O> for SubscribeOnOP<S, SD>
where
O: IntoShared,
S: IntoShared,
S::Shared: RawSubscribable<Item, Err, O::Shared, Unsub = SharedSubscription>,
SD: Scheduler,
{
type Unsub = SharedSubscription;
fn raw_subscribe(self, subscriber: O) -> Self::Unsub {
let source = self.source.to_shared();
let subscriber = subscriber.to_shared();
self.scheduler.schedule(
move |mut subscription, _| {
subscription.add(source.raw_subscribe(subscriber))
},
None,
(),
)
}
}
#[cfg(test)]
mod test {
use crate::ops::{Delay, SubscribeOn};
use crate::prelude::*;
use crate::scheduler::Schedulers;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
#[test]
fn new_thread() {
let res = Arc::new(Mutex::new(vec![]));
let c_res = res.clone();
let thread = Arc::new(Mutex::new(vec![]));
let c_thread = thread.clone();
observable::from_iter!(1..5)
.subscribe_on(Schedulers::NewThread)
.subscribe(move |v| {
res.lock().unwrap().push(*v);
let handle = thread::current();
thread.lock().unwrap().push(handle.id());
});
thread::sleep(std::time::Duration::from_millis(1));
assert_eq!(*c_res.lock().unwrap(), (1..5).collect::<Vec<_>>());
assert_ne!(c_thread.lock().unwrap()[0], thread::current().id());
}
#[test]
fn pool_unsubscribe() { unsubscribe_scheduler(Schedulers::ThreadPool) }
#[test]
fn new_thread_unsubscribe() { unsubscribe_scheduler(Schedulers::NewThread) }
fn unsubscribe_scheduler(scheduler: Schedulers) {
let emitted = Arc::new(Mutex::new(vec![]));
let c_emitted = emitted.clone();
observable::from_iter!(0..10)
.subscribe_on(scheduler)
.delay(Duration::from_millis(10))
.subscribe(move |v| {
emitted.lock().unwrap().push(*v);
})
.unsubscribe();
std::thread::sleep(Duration::from_millis(20));
assert_eq!(c_emitted.lock().unwrap().len(), 0);
}
}