rxrust/observable/
timer.rs

1use crate::prelude::*;
2use std::time::{Duration, Instant};
3
4// Returns an observable which will emit a single `item`
5// once after a given `dur` using a given `scheduler`
6pub fn timer<Item, S>(
7  item: Item,
8  dur: Duration,
9  scheduler: S,
10) -> ObservableBase<TimerEmitter<Item, S>> {
11  ObservableBase::new(TimerEmitter {
12    item,
13    dur,
14    scheduler,
15  })
16}
17
18// Returns an observable which will emit a single `item`
19// once at a given timestamp `at` using a given `scheduler`.
20// If timestamp `at` < `Instant::now()`, the observable will emit the item
21// immediately
22pub fn timer_at<Item, S>(
23  item: Item,
24  at: Instant,
25  scheduler: S,
26) -> ObservableBase<TimerEmitter<Item, S>> {
27  let duration = get_duration_from_instant(at);
28  ObservableBase::new(TimerEmitter {
29    item,
30    dur: duration,
31    scheduler,
32  })
33}
34
35// Calculates the duration between `Instant::now()` and a given `instant`.
36// Returns `Duration::default()` when `instant` is a timestamp in the past
37fn get_duration_from_instant(instant: Instant) -> Duration {
38  let now = Instant::now();
39  match instant > now {
40    true => instant - now,
41    false => Duration::default(),
42  }
43}
44
45// Emitter for `observable::timer` and `observable::timer_at` holding the
46// `item` that will be emitted, a `dur` when this will happen and the used
47// `scheduler`
48pub struct TimerEmitter<Item, S> {
49  item: Item,
50  dur: Duration,
51  scheduler: S,
52}
53
54impl<Item, S> Emitter for TimerEmitter<Item, S> {
55  type Item = Item;
56  type Err = ();
57}
58
59impl<Item: 'static, S: LocalScheduler + 'static> LocalEmitter<'static>
60  for TimerEmitter<Item, S>
61{
62  fn emit<O>(self, subscriber: Subscriber<O, LocalSubscription>)
63  where
64    O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
65  {
66    let mut observer = subscriber.observer;
67    let item = self.item;
68    let dur = self.dur;
69
70    let handle = self.scheduler.schedule(
71      move |_| {
72        observer.next(item);
73        observer.complete();
74      },
75      Some(dur),
76      1,
77    );
78
79    subscriber.subscription.add(handle);
80  }
81}
82
83impl<Item: Send + 'static, S: SharedScheduler + 'static> SharedEmitter
84  for TimerEmitter<Item, S>
85{
86  fn emit<O>(self, subscriber: Subscriber<O, SharedSubscription>)
87  where
88    O: Observer<Item = Self::Item, Err = Self::Err> + Send + Sync + 'static,
89  {
90    let mut observer = subscriber.observer;
91    let item = self.item;
92    let dur = self.dur;
93
94    let handle = self.scheduler.schedule(
95      move |_| {
96        observer.next(item);
97        observer.complete();
98      },
99      Some(dur),
100      1,
101    );
102
103    subscriber.subscription.add(handle);
104  }
105}
106
107#[cfg(test)]
108mod tests {
109  use crate::prelude::*;
110  use futures::executor::{LocalPool, ThreadPool};
111  use std::sync::atomic::{AtomicBool, AtomicI32, AtomicUsize, Ordering};
112  use std::sync::Arc;
113  use std::time::{Duration, Instant};
114
115  #[test]
116  fn timer_shall_emit_value() {
117    let mut local = LocalPool::new();
118
119    let val = 1234;
120    let i_emitted = Arc::new(AtomicI32::new(0));
121    let i_emitted_c = i_emitted.clone();
122
123    observable::timer(val, Duration::from_millis(5), local.spawner())
124      .subscribe(move |n| {
125        i_emitted_c.store(n, Ordering::Relaxed);
126      });
127
128    local.run();
129
130    assert_eq!(val, i_emitted.load(Ordering::Relaxed));
131  }
132
133  #[test]
134  fn timer_shall_emit_value_shared() {
135    let pool = ThreadPool::new().unwrap();
136
137    let val = 1234;
138    let i_emitted = Arc::new(AtomicI32::new(0));
139    let i_emitted_c = i_emitted.clone();
140
141    observable::timer(val, Duration::from_millis(5), pool)
142      .into_shared()
143      .subscribe_blocking(move |n| {
144        i_emitted_c.store(n, Ordering::Relaxed);
145      });
146
147    assert_eq!(val, i_emitted.load(Ordering::Relaxed));
148  }
149
150  #[test]
151  fn timer_shall_call_next_once() {
152    let mut local = LocalPool::new();
153
154    let next_count = Arc::new(AtomicUsize::new(0));
155    let next_count_c = next_count.clone();
156
157    observable::timer("aString", Duration::from_millis(5), local.spawner())
158      .subscribe(move |_| {
159        let count = next_count_c.load(Ordering::Relaxed);
160        next_count_c.store(count + 1, Ordering::Relaxed);
161      });
162
163    local.run();
164
165    assert_eq!(next_count.load(Ordering::Relaxed), 1);
166  }
167
168  #[test]
169  fn timer_shall_call_next_once_shared() {
170    let pool = ThreadPool::new().unwrap();
171
172    let next_count = Arc::new(AtomicUsize::new(0));
173    let next_count_c = next_count.clone();
174
175    observable::timer("aString", Duration::from_millis(5), pool)
176      .into_shared()
177      .subscribe_blocking(move |_| {
178        let count = next_count_c.load(Ordering::Relaxed);
179        next_count_c.store(count + 1, Ordering::Relaxed);
180      });
181
182    assert_eq!(next_count.load(Ordering::Relaxed), 1);
183  }
184
185  #[test]
186  fn timer_shall_be_completed() {
187    let mut local = LocalPool::new();
188
189    let is_completed = Arc::new(AtomicBool::new(false));
190    let is_completed_c = is_completed.clone();
191
192    observable::timer("aString", Duration::from_millis(5), local.spawner())
193      .subscribe_complete(
194        |_| {},
195        move || {
196          is_completed_c.store(true, Ordering::Relaxed);
197        },
198      );
199
200    local.run();
201
202    assert!(is_completed.load(Ordering::Relaxed));
203  }
204
205  #[test]
206  fn timer_shall_be_completed_shared() {
207    let pool = ThreadPool::new().unwrap();
208
209    let is_completed = Arc::new(AtomicBool::new(false));
210    let is_completed_c = is_completed.clone();
211
212    observable::timer("aString", Duration::from_millis(5), pool)
213      .into_shared()
214      .subscribe_blocking_all(
215        |_| {},
216        |_| {},
217        move || {
218          is_completed_c.store(true, Ordering::Relaxed);
219        },
220      );
221
222    assert!(is_completed.load(Ordering::Relaxed));
223  }
224
225  #[test]
226  fn timer_shall_elapse_duration() {
227    let mut local = LocalPool::new();
228
229    let duration = Duration::from_millis(50);
230    let stamp = Instant::now();
231
232    observable::timer("aString", duration, local.spawner()).subscribe(|_| {});
233
234    local.run();
235
236    assert!(stamp.elapsed() >= duration);
237  }
238
239  #[test]
240  fn timer_shall_elapse_duration_shared() {
241    let pool = ThreadPool::new().unwrap();
242
243    let duration = Duration::from_millis(50);
244    let stamp = Instant::now();
245
246    observable::timer("aString", duration, pool)
247      .into_shared()
248      .subscribe_blocking(|_| {});
249
250    assert!(stamp.elapsed() >= duration);
251  }
252
253  #[test]
254  fn timer_at_shall_emit_value() {
255    let mut local = LocalPool::new();
256
257    let val = 1234;
258    let i_emitted = Arc::new(AtomicI32::new(0));
259    let i_emitted_c = i_emitted.clone();
260
261    observable::timer_at(
262      val,
263      Instant::now() + Duration::from_millis(10),
264      local.spawner(),
265    )
266    .subscribe(move |n| {
267      i_emitted_c.store(n, Ordering::Relaxed);
268    });
269
270    local.run();
271
272    assert_eq!(val, i_emitted.load(Ordering::Relaxed));
273  }
274
275  #[test]
276  fn timer_at_shall_emit_value_shared() {
277    let pool = ThreadPool::new().unwrap();
278
279    let val = 1234;
280    let i_emitted = Arc::new(AtomicI32::new(0));
281    let i_emitted_c = i_emitted.clone();
282
283    observable::timer_at(val, Instant::now() + Duration::from_millis(10), pool)
284      .into_shared()
285      .subscribe_blocking(move |n| {
286        i_emitted_c.store(n, Ordering::Relaxed);
287      });
288
289    assert_eq!(val, i_emitted.load(Ordering::Relaxed));
290  }
291
292  #[test]
293  fn timer_at_shall_call_next_once() {
294    let mut local = LocalPool::new();
295
296    let next_count = Arc::new(AtomicUsize::new(0));
297    let next_count_c = next_count.clone();
298
299    observable::timer_at(
300      "aString",
301      Instant::now() + Duration::from_millis(10),
302      local.spawner(),
303    )
304    .subscribe(move |_| {
305      let count = next_count_c.load(Ordering::Relaxed);
306      next_count_c.store(count + 1, Ordering::Relaxed);
307    });
308
309    local.run();
310
311    assert_eq!(next_count.load(Ordering::Relaxed), 1);
312  }
313
314  #[test]
315  fn timer_at_shall_be_completed() {
316    let mut local = LocalPool::new();
317
318    let is_completed = Arc::new(AtomicBool::new(false));
319    let is_completed_c = is_completed.clone();
320
321    observable::timer_at(
322      "aString",
323      Instant::now() + Duration::from_millis(10),
324      local.spawner(),
325    )
326    .subscribe_complete(
327      |_| {},
328      move || {
329        is_completed_c.store(true, Ordering::Relaxed);
330      },
331    );
332
333    local.run();
334
335    assert!(is_completed.load(Ordering::Relaxed));
336  }
337
338  #[test]
339  fn timer_at_shall_elapse_duration_with_valid_timestamp() {
340    let mut local = LocalPool::new();
341
342    let duration = Duration::from_millis(50);
343    let stamp = Instant::now();
344    let execute_at = stamp + duration;
345
346    observable::timer_at("aString", execute_at, local.spawner())
347      .subscribe(|_| {});
348
349    local.run();
350
351    assert!(stamp.elapsed() >= duration);
352  }
353
354  #[test]
355  fn timer_at_shall_complete_with_invalid_timestamp_with_no_delay() {
356    let mut local = LocalPool::new();
357
358    let is_completed = Arc::new(AtomicBool::new(false));
359    let is_completed_c = is_completed.clone();
360
361    let duration = Duration::from_secs(1);
362    let now = Instant::now();
363    let execute_at = now - duration; // execute 1 sec in past
364
365    observable::timer_at("aString", execute_at, local.spawner())
366      .subscribe_complete(
367        |_| {},
368        move || {
369          is_completed_c.store(true, Ordering::Relaxed);
370        },
371      );
372
373    local.run();
374
375    assert!(now.elapsed() < duration);
376    assert!(is_completed.load(Ordering::Relaxed));
377  }
378}