1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use crate::prelude::*;
mod thread_scheduler;
use thread_scheduler::new_thread_schedule;
mod thread_pool_scheduler;
use crate::observable::{from_future::DEFAULT_RUNTIME, interval::SpawnHandle};
use futures::prelude::*;
use futures::task::SpawnExt;
use futures_timer::Delay;
use std::time::Duration;
use thread_pool_scheduler::thread_pool_schedule;
pub trait Scheduler {
fn schedule<T: Send + 'static>(
&self,
task: impl FnOnce(SharedSubscription, T) + Send + 'static,
delay: Option<Duration>,
state: T,
) -> SharedSubscription;
}
pub enum Schedulers {
NewThread,
ThreadPool,
}
impl Scheduler for Schedulers {
fn schedule<T: Send + 'static>(
&self,
task: impl FnOnce(SharedSubscription, T) + Send + 'static,
delay: Option<Duration>,
state: T,
) -> SharedSubscription {
match self {
Schedulers::NewThread => new_thread_schedule(task, delay, state),
Schedulers::ThreadPool => thread_pool_schedule(task, delay, state),
}
}
}
pub fn delay_task(
delay: Duration,
task: impl FnOnce() + Send + 'static,
) -> SpawnHandle<Result<(), std::io::Error>> {
let f = Delay::new(delay).inspect(|_| {
task();
});
let handle = DEFAULT_RUNTIME
.lock()
.unwrap()
.spawn_with_handle(f)
.expect("spawn task to thread pool failed.");
SpawnHandle::new(handle)
}
#[cfg(test)]
mod test {
extern crate test;
use crate::ops::ObserveOn;
use crate::prelude::*;
use crate::scheduler::Schedulers;
use std::f32;
use std::sync::{Arc, Mutex};
use test::Bencher;
#[bench]
fn pool(b: &mut Bencher) { b.iter(|| sum_of_sqrt(Schedulers::ThreadPool)) }
#[bench]
fn new_thread(b: &mut Bencher) {
b.iter(|| sum_of_sqrt(Schedulers::NewThread))
}
fn sum_of_sqrt(scheduler: Schedulers) {
let sum = Arc::new(Mutex::new(0.));
observable::from_iter(0..10)
.observe_on(scheduler)
.subscribe(move |v| {
*sum.lock().unwrap() =
(0..1000).fold((v as f32).sqrt(), |acc, _| acc.sqrt());
});
}
}