rxrust/
scheduler.rs

1use crate::prelude::*;
2use async_std::prelude::FutureExt as AsyncFutureExt;
3use futures::future::{lazy, AbortHandle, FutureExt};
4use std::future::Future;
5
6use futures::StreamExt;
7use std::sync::{Arc, RwLock};
8use std::time::{Duration, Instant};
9
10pub fn task_future<T>(
11  task: impl FnOnce(T) + 'static,
12  state: T,
13  delay: Option<Duration>,
14) -> (impl Future<Output = ()>, SpawnHandle) {
15  let fut = lazy(|_| task(state)).delay(delay.unwrap_or_default());
16  let (fut, handle) = futures::future::abortable(fut);
17  (fut.map(|_| ()), SpawnHandle::new(handle))
18}
19
20/// A Scheduler is an object to order task and schedule their execution.
21pub trait SharedScheduler {
22  fn spawn<Fut>(&self, future: Fut)
23  where
24    Fut: Future<Output = ()> + Send + 'static;
25
26  fn schedule<T: Send + 'static>(
27    &self,
28    task: impl FnOnce(T) + Send + 'static,
29    delay: Option<Duration>,
30    state: T,
31  ) -> SpawnHandle {
32    let (f, handle) = task_future(task, state, delay);
33    self.spawn(f);
34    handle
35  }
36
37  fn schedule_repeating(
38    &self,
39    task: impl FnMut(usize) + Send + 'static,
40    time_between: Duration,
41    at: Option<Instant>,
42  ) -> SpawnHandle {
43    let (f, handle) = repeating_future(task, time_between, at);
44    self.spawn(f.map(|_| ()));
45    handle
46  }
47}
48
49pub trait LocalScheduler {
50  fn spawn<Fut>(&self, future: Fut)
51  where
52    Fut: Future<Output = ()> + 'static;
53
54  fn schedule<T: 'static>(
55    &self,
56    task: impl FnOnce(T) + 'static,
57    delay: Option<Duration>,
58    state: T,
59  ) -> SpawnHandle {
60    let (f, handle) = task_future(task, state, delay);
61    self.spawn(f);
62    handle
63  }
64
65  fn schedule_repeating(
66    &self,
67    task: impl FnMut(usize) + 'static,
68    time_between: Duration,
69    at: Option<Instant>,
70  ) -> SpawnHandle {
71    let (f, handle) = repeating_future(task, time_between, at);
72    self.spawn(f.map(|_| ()));
73    handle
74  }
75}
76
77#[derive(Clone)]
78pub struct SpawnHandle {
79  pub handle: AbortHandle,
80  is_closed: Arc<RwLock<bool>>,
81}
82
83impl SpawnHandle {
84  #[inline]
85  pub fn new(handle: AbortHandle) -> Self {
86    SpawnHandle {
87      handle,
88      is_closed: Arc::new(RwLock::new(false)),
89    }
90  }
91}
92
93impl SubscriptionLike for SpawnHandle {
94  fn unsubscribe(&mut self) {
95    let was_closed = *self.is_closed.read().unwrap();
96    if !was_closed {
97      *self.is_closed.write().unwrap() = true;
98      self.handle.abort();
99    }
100  }
101
102  #[inline]
103  fn is_closed(&self) -> bool { *self.is_closed.read().unwrap() }
104}
105
106#[cfg(feature = "futures-scheduler")]
107mod futures_scheduler {
108  use crate::scheduler::{LocalScheduler, SharedScheduler};
109  use futures::{
110    executor::{LocalSpawner, ThreadPool},
111    task::{LocalSpawnExt, SpawnExt},
112    Future, FutureExt,
113  };
114
115  impl SharedScheduler for ThreadPool {
116    fn spawn<Fut>(&self, future: Fut)
117    where
118      Fut: Future<Output = ()> + Send + 'static,
119    {
120      SpawnExt::spawn(self, future).unwrap();
121    }
122  }
123
124  impl LocalScheduler for LocalSpawner {
125    fn spawn<Fut>(&self, future: Fut)
126    where
127      Fut: Future<Output = ()> + 'static,
128    {
129      self.spawn_local(future.map(|_| ())).unwrap();
130    }
131  }
132}
133
134fn repeating_future(
135  task: impl FnMut(usize) + 'static,
136  time_between: Duration,
137  at: Option<Instant>,
138) -> (impl Future<Output = ()>, SpawnHandle) {
139  let now = Instant::now();
140  let delay = at.map(|inst| {
141    if inst > now {
142      inst - now
143    } else {
144      Duration::from_micros(0)
145    }
146  });
147  let future = to_interval(task, time_between, delay.unwrap_or(time_between));
148  let (fut, handle) = futures::future::abortable(future);
149  (fut.map(|_| ()), SpawnHandle::new(handle))
150}
151
152fn to_interval(
153  mut task: impl FnMut(usize) + 'static,
154  interval_duration: Duration,
155  delay: Duration,
156) -> impl Future<Output = ()> {
157  let mut number = 0;
158
159  futures::future::ready(())
160    .then(move |_| {
161      task(number);
162      async_std::stream::interval(interval_duration).for_each(move |_| {
163        number += 1;
164        task(number);
165        futures::future::ready(())
166      })
167    })
168    .delay(delay)
169}
170
171#[cfg(feature = "tokio-scheduler")]
172mod tokio_scheduler {
173  use super::*;
174  use std::sync::Arc;
175  use tokio::runtime::Runtime;
176
177  impl SharedScheduler for Runtime {
178    fn spawn<Fut>(&self, future: Fut)
179    where
180      Fut: Future<Output = ()> + Send + 'static,
181    {
182      Runtime::spawn(self, future);
183    }
184  }
185
186  impl SharedScheduler for Arc<Runtime> {
187    fn spawn<Fut>(&self, future: Fut)
188    where
189      Fut: Future<Output = ()> + Send + 'static,
190    {
191      Runtime::spawn(self, future);
192    }
193  }
194}
195
196#[cfg(all(test, feature = "tokio-scheduler"))]
197mod test {
198  use crate::prelude::*;
199  use bencher::Bencher;
200  use futures::executor::{LocalPool, ThreadPool};
201  use std::sync::{Arc, Mutex};
202
203  fn waste_time(v: u32) -> u32 {
204    (0..v)
205      .into_iter()
206      .map(|index| (0..index).sum::<u32>().min(u32::MAX / v))
207      .sum()
208  }
209
210  #[test]
211  fn bench_pool() { do_bench_pool(); }
212
213  benchmark_group!(do_bench_pool, pool);
214
215  fn pool(b: &mut Bencher) {
216    let last = Arc::new(Mutex::new(0));
217    b.iter(|| {
218      let c_last = last.clone();
219      let pool = ThreadPool::new().unwrap();
220      observable::from_iter(0..1000)
221        .observe_on(pool)
222        .map(waste_time)
223        .into_shared()
224        .subscribe(move |v| *c_last.lock().unwrap() = v);
225
226      // todo: no way to wait all task has finished in `ThreadPool`.
227
228      *last.lock().unwrap()
229    })
230  }
231
232  #[test]
233  fn bench_local_thread() { do_bench_local_thread(); }
234
235  benchmark_group!(do_bench_local_thread, local_thread);
236
237  fn local_thread(b: &mut Bencher) {
238    let last = Arc::new(Mutex::new(0));
239    b.iter(|| {
240      let c_last = last.clone();
241      let mut local = LocalPool::new();
242      observable::from_iter(0..1000)
243        .observe_on(local.spawner())
244        .map(waste_time)
245        .subscribe(move |v| *c_last.lock().unwrap() = v);
246      local.run();
247      *last.lock().unwrap()
248    })
249  }
250
251  #[test]
252  fn bench_tokio_basic() { do_bench_tokio_basic(); }
253
254  benchmark_group!(do_bench_tokio_basic, tokio_basic);
255
256  fn tokio_basic(b: &mut Bencher) {
257    use tokio::runtime;
258    let last = Arc::new(Mutex::new(0));
259    b.iter(|| {
260      let c_last = last.clone();
261      let local = runtime::Builder::new_current_thread().build().unwrap();
262
263      observable::from_iter(0..1000)
264        .observe_on(local)
265        .map(waste_time)
266        .into_shared()
267        .subscribe(move |v| *c_last.lock().unwrap() = v);
268
269      // todo: no way to wait all task has finished in `Tokio` Scheduler.
270      *last.lock().unwrap()
271    })
272  }
273
274  #[test]
275  fn bench_tokio_thread() { do_bench_tokio_thread(); }
276
277  benchmark_group!(do_bench_tokio_thread, tokio_thread);
278
279  fn tokio_thread(b: &mut Bencher) {
280    use tokio::runtime;
281    let last = Arc::new(Mutex::new(0));
282    b.iter(|| {
283      let c_last = last.clone();
284      let pool = runtime::Runtime::new().unwrap();
285      observable::from_iter(0..1000)
286        .observe_on(pool)
287        .map(waste_time)
288        .into_shared()
289        .subscribe(move |v| *c_last.lock().unwrap() = v);
290
291      // todo: no way to wait all task has finished in `Tokio` Scheduler.
292
293      *last.lock().unwrap()
294    })
295  }
296}