1use crate::prelude::*;
2use async_std::prelude::FutureExt as AsyncFutureExt;
3use futures::future::{lazy, AbortHandle, FutureExt};
4use std::future::Future;
5
6use futures::StreamExt;
7use std::sync::{Arc, RwLock};
8use std::time::{Duration, Instant};
9
10pub fn task_future<T>(
11 task: impl FnOnce(T) + 'static,
12 state: T,
13 delay: Option<Duration>,
14) -> (impl Future<Output = ()>, SpawnHandle) {
15 let fut = lazy(|_| task(state)).delay(delay.unwrap_or_default());
16 let (fut, handle) = futures::future::abortable(fut);
17 (fut.map(|_| ()), SpawnHandle::new(handle))
18}
19
20pub trait SharedScheduler {
22 fn spawn<Fut>(&self, future: Fut)
23 where
24 Fut: Future<Output = ()> + Send + 'static;
25
26 fn schedule<T: Send + 'static>(
27 &self,
28 task: impl FnOnce(T) + Send + 'static,
29 delay: Option<Duration>,
30 state: T,
31 ) -> SpawnHandle {
32 let (f, handle) = task_future(task, state, delay);
33 self.spawn(f);
34 handle
35 }
36
37 fn schedule_repeating(
38 &self,
39 task: impl FnMut(usize) + Send + 'static,
40 time_between: Duration,
41 at: Option<Instant>,
42 ) -> SpawnHandle {
43 let (f, handle) = repeating_future(task, time_between, at);
44 self.spawn(f.map(|_| ()));
45 handle
46 }
47}
48
49pub trait LocalScheduler {
50 fn spawn<Fut>(&self, future: Fut)
51 where
52 Fut: Future<Output = ()> + 'static;
53
54 fn schedule<T: 'static>(
55 &self,
56 task: impl FnOnce(T) + 'static,
57 delay: Option<Duration>,
58 state: T,
59 ) -> SpawnHandle {
60 let (f, handle) = task_future(task, state, delay);
61 self.spawn(f);
62 handle
63 }
64
65 fn schedule_repeating(
66 &self,
67 task: impl FnMut(usize) + 'static,
68 time_between: Duration,
69 at: Option<Instant>,
70 ) -> SpawnHandle {
71 let (f, handle) = repeating_future(task, time_between, at);
72 self.spawn(f.map(|_| ()));
73 handle
74 }
75}
76
77#[derive(Clone)]
78pub struct SpawnHandle {
79 pub handle: AbortHandle,
80 is_closed: Arc<RwLock<bool>>,
81}
82
83impl SpawnHandle {
84 #[inline]
85 pub fn new(handle: AbortHandle) -> Self {
86 SpawnHandle {
87 handle,
88 is_closed: Arc::new(RwLock::new(false)),
89 }
90 }
91}
92
93impl SubscriptionLike for SpawnHandle {
94 fn unsubscribe(&mut self) {
95 let was_closed = *self.is_closed.read().unwrap();
96 if !was_closed {
97 *self.is_closed.write().unwrap() = true;
98 self.handle.abort();
99 }
100 }
101
102 #[inline]
103 fn is_closed(&self) -> bool { *self.is_closed.read().unwrap() }
104}
105
106#[cfg(feature = "futures-scheduler")]
107mod futures_scheduler {
108 use crate::scheduler::{LocalScheduler, SharedScheduler};
109 use futures::{
110 executor::{LocalSpawner, ThreadPool},
111 task::{LocalSpawnExt, SpawnExt},
112 Future, FutureExt,
113 };
114
115 impl SharedScheduler for ThreadPool {
116 fn spawn<Fut>(&self, future: Fut)
117 where
118 Fut: Future<Output = ()> + Send + 'static,
119 {
120 SpawnExt::spawn(self, future).unwrap();
121 }
122 }
123
124 impl LocalScheduler for LocalSpawner {
125 fn spawn<Fut>(&self, future: Fut)
126 where
127 Fut: Future<Output = ()> + 'static,
128 {
129 self.spawn_local(future.map(|_| ())).unwrap();
130 }
131 }
132}
133
134fn repeating_future(
135 task: impl FnMut(usize) + 'static,
136 time_between: Duration,
137 at: Option<Instant>,
138) -> (impl Future<Output = ()>, SpawnHandle) {
139 let now = Instant::now();
140 let delay = at.map(|inst| {
141 if inst > now {
142 inst - now
143 } else {
144 Duration::from_micros(0)
145 }
146 });
147 let future = to_interval(task, time_between, delay.unwrap_or(time_between));
148 let (fut, handle) = futures::future::abortable(future);
149 (fut.map(|_| ()), SpawnHandle::new(handle))
150}
151
152fn to_interval(
153 mut task: impl FnMut(usize) + 'static,
154 interval_duration: Duration,
155 delay: Duration,
156) -> impl Future<Output = ()> {
157 let mut number = 0;
158
159 futures::future::ready(())
160 .then(move |_| {
161 task(number);
162 async_std::stream::interval(interval_duration).for_each(move |_| {
163 number += 1;
164 task(number);
165 futures::future::ready(())
166 })
167 })
168 .delay(delay)
169}
170
171#[cfg(feature = "tokio-scheduler")]
172mod tokio_scheduler {
173 use super::*;
174 use std::sync::Arc;
175 use tokio::runtime::Runtime;
176
177 impl SharedScheduler for Runtime {
178 fn spawn<Fut>(&self, future: Fut)
179 where
180 Fut: Future<Output = ()> + Send + 'static,
181 {
182 Runtime::spawn(self, future);
183 }
184 }
185
186 impl SharedScheduler for Arc<Runtime> {
187 fn spawn<Fut>(&self, future: Fut)
188 where
189 Fut: Future<Output = ()> + Send + 'static,
190 {
191 Runtime::spawn(self, future);
192 }
193 }
194}
195
196#[cfg(all(test, feature = "tokio-scheduler"))]
197mod test {
198 use crate::prelude::*;
199 use bencher::Bencher;
200 use futures::executor::{LocalPool, ThreadPool};
201 use std::sync::{Arc, Mutex};
202
203 fn waste_time(v: u32) -> u32 {
204 (0..v)
205 .into_iter()
206 .map(|index| (0..index).sum::<u32>().min(u32::MAX / v))
207 .sum()
208 }
209
210 #[test]
211 fn bench_pool() { do_bench_pool(); }
212
213 benchmark_group!(do_bench_pool, pool);
214
215 fn pool(b: &mut Bencher) {
216 let last = Arc::new(Mutex::new(0));
217 b.iter(|| {
218 let c_last = last.clone();
219 let pool = ThreadPool::new().unwrap();
220 observable::from_iter(0..1000)
221 .observe_on(pool)
222 .map(waste_time)
223 .into_shared()
224 .subscribe(move |v| *c_last.lock().unwrap() = v);
225
226 *last.lock().unwrap()
229 })
230 }
231
232 #[test]
233 fn bench_local_thread() { do_bench_local_thread(); }
234
235 benchmark_group!(do_bench_local_thread, local_thread);
236
237 fn local_thread(b: &mut Bencher) {
238 let last = Arc::new(Mutex::new(0));
239 b.iter(|| {
240 let c_last = last.clone();
241 let mut local = LocalPool::new();
242 observable::from_iter(0..1000)
243 .observe_on(local.spawner())
244 .map(waste_time)
245 .subscribe(move |v| *c_last.lock().unwrap() = v);
246 local.run();
247 *last.lock().unwrap()
248 })
249 }
250
251 #[test]
252 fn bench_tokio_basic() { do_bench_tokio_basic(); }
253
254 benchmark_group!(do_bench_tokio_basic, tokio_basic);
255
256 fn tokio_basic(b: &mut Bencher) {
257 use tokio::runtime;
258 let last = Arc::new(Mutex::new(0));
259 b.iter(|| {
260 let c_last = last.clone();
261 let local = runtime::Builder::new_current_thread().build().unwrap();
262
263 observable::from_iter(0..1000)
264 .observe_on(local)
265 .map(waste_time)
266 .into_shared()
267 .subscribe(move |v| *c_last.lock().unwrap() = v);
268
269 *last.lock().unwrap()
271 })
272 }
273
274 #[test]
275 fn bench_tokio_thread() { do_bench_tokio_thread(); }
276
277 benchmark_group!(do_bench_tokio_thread, tokio_thread);
278
279 fn tokio_thread(b: &mut Bencher) {
280 use tokio::runtime;
281 let last = Arc::new(Mutex::new(0));
282 b.iter(|| {
283 let c_last = last.clone();
284 let pool = runtime::Runtime::new().unwrap();
285 observable::from_iter(0..1000)
286 .observe_on(pool)
287 .map(waste_time)
288 .into_shared()
289 .subscribe(move |v| *c_last.lock().unwrap() = v);
290
291 *last.lock().unwrap()
294 })
295 }
296}