1use crate::prelude::*;
2use std::time::{Duration, Instant};
3
4pub fn timer<Item, S>(
7 item: Item,
8 dur: Duration,
9 scheduler: S,
10) -> ObservableBase<TimerEmitter<Item, S>> {
11 ObservableBase::new(TimerEmitter {
12 item,
13 dur,
14 scheduler,
15 })
16}
17
18pub fn timer_at<Item, S>(
23 item: Item,
24 at: Instant,
25 scheduler: S,
26) -> ObservableBase<TimerEmitter<Item, S>> {
27 let duration = get_duration_from_instant(at);
28 ObservableBase::new(TimerEmitter {
29 item,
30 dur: duration,
31 scheduler,
32 })
33}
34
35fn get_duration_from_instant(instant: Instant) -> Duration {
38 let now = Instant::now();
39 match instant > now {
40 true => instant - now,
41 false => Duration::default(),
42 }
43}
44
45pub struct TimerEmitter<Item, S> {
49 item: Item,
50 dur: Duration,
51 scheduler: S,
52}
53
54impl<Item, S> Emitter for TimerEmitter<Item, S> {
55 type Item = Item;
56 type Err = ();
57}
58
59impl<Item: 'static, S: LocalScheduler + 'static> LocalEmitter<'static>
60 for TimerEmitter<Item, S>
61{
62 fn emit<O>(self, subscriber: Subscriber<O, LocalSubscription>)
63 where
64 O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
65 {
66 let mut observer = subscriber.observer;
67 let item = self.item;
68 let dur = self.dur;
69
70 let handle = self.scheduler.schedule(
71 move |_| {
72 observer.next(item);
73 observer.complete();
74 },
75 Some(dur),
76 1,
77 );
78
79 subscriber.subscription.add(handle);
80 }
81}
82
83impl<Item: Send + 'static, S: SharedScheduler + 'static> SharedEmitter
84 for TimerEmitter<Item, S>
85{
86 fn emit<O>(self, subscriber: Subscriber<O, SharedSubscription>)
87 where
88 O: Observer<Item = Self::Item, Err = Self::Err> + Send + Sync + 'static,
89 {
90 let mut observer = subscriber.observer;
91 let item = self.item;
92 let dur = self.dur;
93
94 let handle = self.scheduler.schedule(
95 move |_| {
96 observer.next(item);
97 observer.complete();
98 },
99 Some(dur),
100 1,
101 );
102
103 subscriber.subscription.add(handle);
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use crate::prelude::*;
110 use futures::executor::{LocalPool, ThreadPool};
111 use std::sync::atomic::{AtomicBool, AtomicI32, AtomicUsize, Ordering};
112 use std::sync::Arc;
113 use std::time::{Duration, Instant};
114
115 #[test]
116 fn timer_shall_emit_value() {
117 let mut local = LocalPool::new();
118
119 let val = 1234;
120 let i_emitted = Arc::new(AtomicI32::new(0));
121 let i_emitted_c = i_emitted.clone();
122
123 observable::timer(val, Duration::from_millis(5), local.spawner())
124 .subscribe(move |n| {
125 i_emitted_c.store(n, Ordering::Relaxed);
126 });
127
128 local.run();
129
130 assert_eq!(val, i_emitted.load(Ordering::Relaxed));
131 }
132
133 #[test]
134 fn timer_shall_emit_value_shared() {
135 let pool = ThreadPool::new().unwrap();
136
137 let val = 1234;
138 let i_emitted = Arc::new(AtomicI32::new(0));
139 let i_emitted_c = i_emitted.clone();
140
141 observable::timer(val, Duration::from_millis(5), pool)
142 .into_shared()
143 .subscribe_blocking(move |n| {
144 i_emitted_c.store(n, Ordering::Relaxed);
145 });
146
147 assert_eq!(val, i_emitted.load(Ordering::Relaxed));
148 }
149
150 #[test]
151 fn timer_shall_call_next_once() {
152 let mut local = LocalPool::new();
153
154 let next_count = Arc::new(AtomicUsize::new(0));
155 let next_count_c = next_count.clone();
156
157 observable::timer("aString", Duration::from_millis(5), local.spawner())
158 .subscribe(move |_| {
159 let count = next_count_c.load(Ordering::Relaxed);
160 next_count_c.store(count + 1, Ordering::Relaxed);
161 });
162
163 local.run();
164
165 assert_eq!(next_count.load(Ordering::Relaxed), 1);
166 }
167
168 #[test]
169 fn timer_shall_call_next_once_shared() {
170 let pool = ThreadPool::new().unwrap();
171
172 let next_count = Arc::new(AtomicUsize::new(0));
173 let next_count_c = next_count.clone();
174
175 observable::timer("aString", Duration::from_millis(5), pool)
176 .into_shared()
177 .subscribe_blocking(move |_| {
178 let count = next_count_c.load(Ordering::Relaxed);
179 next_count_c.store(count + 1, Ordering::Relaxed);
180 });
181
182 assert_eq!(next_count.load(Ordering::Relaxed), 1);
183 }
184
185 #[test]
186 fn timer_shall_be_completed() {
187 let mut local = LocalPool::new();
188
189 let is_completed = Arc::new(AtomicBool::new(false));
190 let is_completed_c = is_completed.clone();
191
192 observable::timer("aString", Duration::from_millis(5), local.spawner())
193 .subscribe_complete(
194 |_| {},
195 move || {
196 is_completed_c.store(true, Ordering::Relaxed);
197 },
198 );
199
200 local.run();
201
202 assert!(is_completed.load(Ordering::Relaxed));
203 }
204
205 #[test]
206 fn timer_shall_be_completed_shared() {
207 let pool = ThreadPool::new().unwrap();
208
209 let is_completed = Arc::new(AtomicBool::new(false));
210 let is_completed_c = is_completed.clone();
211
212 observable::timer("aString", Duration::from_millis(5), pool)
213 .into_shared()
214 .subscribe_blocking_all(
215 |_| {},
216 |_| {},
217 move || {
218 is_completed_c.store(true, Ordering::Relaxed);
219 },
220 );
221
222 assert!(is_completed.load(Ordering::Relaxed));
223 }
224
225 #[test]
226 fn timer_shall_elapse_duration() {
227 let mut local = LocalPool::new();
228
229 let duration = Duration::from_millis(50);
230 let stamp = Instant::now();
231
232 observable::timer("aString", duration, local.spawner()).subscribe(|_| {});
233
234 local.run();
235
236 assert!(stamp.elapsed() >= duration);
237 }
238
239 #[test]
240 fn timer_shall_elapse_duration_shared() {
241 let pool = ThreadPool::new().unwrap();
242
243 let duration = Duration::from_millis(50);
244 let stamp = Instant::now();
245
246 observable::timer("aString", duration, pool)
247 .into_shared()
248 .subscribe_blocking(|_| {});
249
250 assert!(stamp.elapsed() >= duration);
251 }
252
253 #[test]
254 fn timer_at_shall_emit_value() {
255 let mut local = LocalPool::new();
256
257 let val = 1234;
258 let i_emitted = Arc::new(AtomicI32::new(0));
259 let i_emitted_c = i_emitted.clone();
260
261 observable::timer_at(
262 val,
263 Instant::now() + Duration::from_millis(10),
264 local.spawner(),
265 )
266 .subscribe(move |n| {
267 i_emitted_c.store(n, Ordering::Relaxed);
268 });
269
270 local.run();
271
272 assert_eq!(val, i_emitted.load(Ordering::Relaxed));
273 }
274
275 #[test]
276 fn timer_at_shall_emit_value_shared() {
277 let pool = ThreadPool::new().unwrap();
278
279 let val = 1234;
280 let i_emitted = Arc::new(AtomicI32::new(0));
281 let i_emitted_c = i_emitted.clone();
282
283 observable::timer_at(val, Instant::now() + Duration::from_millis(10), pool)
284 .into_shared()
285 .subscribe_blocking(move |n| {
286 i_emitted_c.store(n, Ordering::Relaxed);
287 });
288
289 assert_eq!(val, i_emitted.load(Ordering::Relaxed));
290 }
291
292 #[test]
293 fn timer_at_shall_call_next_once() {
294 let mut local = LocalPool::new();
295
296 let next_count = Arc::new(AtomicUsize::new(0));
297 let next_count_c = next_count.clone();
298
299 observable::timer_at(
300 "aString",
301 Instant::now() + Duration::from_millis(10),
302 local.spawner(),
303 )
304 .subscribe(move |_| {
305 let count = next_count_c.load(Ordering::Relaxed);
306 next_count_c.store(count + 1, Ordering::Relaxed);
307 });
308
309 local.run();
310
311 assert_eq!(next_count.load(Ordering::Relaxed), 1);
312 }
313
314 #[test]
315 fn timer_at_shall_be_completed() {
316 let mut local = LocalPool::new();
317
318 let is_completed = Arc::new(AtomicBool::new(false));
319 let is_completed_c = is_completed.clone();
320
321 observable::timer_at(
322 "aString",
323 Instant::now() + Duration::from_millis(10),
324 local.spawner(),
325 )
326 .subscribe_complete(
327 |_| {},
328 move || {
329 is_completed_c.store(true, Ordering::Relaxed);
330 },
331 );
332
333 local.run();
334
335 assert!(is_completed.load(Ordering::Relaxed));
336 }
337
338 #[test]
339 fn timer_at_shall_elapse_duration_with_valid_timestamp() {
340 let mut local = LocalPool::new();
341
342 let duration = Duration::from_millis(50);
343 let stamp = Instant::now();
344 let execute_at = stamp + duration;
345
346 observable::timer_at("aString", execute_at, local.spawner())
347 .subscribe(|_| {});
348
349 local.run();
350
351 assert!(stamp.elapsed() >= duration);
352 }
353
354 #[test]
355 fn timer_at_shall_complete_with_invalid_timestamp_with_no_delay() {
356 let mut local = LocalPool::new();
357
358 let is_completed = Arc::new(AtomicBool::new(false));
359 let is_completed_c = is_completed.clone();
360
361 let duration = Duration::from_secs(1);
362 let now = Instant::now();
363 let execute_at = now - duration; observable::timer_at("aString", execute_at, local.spawner())
366 .subscribe_complete(
367 |_| {},
368 move || {
369 is_completed_c.store(true, Ordering::Relaxed);
370 },
371 );
372
373 local.run();
374
375 assert!(now.elapsed() < duration);
376 assert!(is_completed.load(Ordering::Relaxed));
377 }
378}