wasm_timer/
timer.rs

1// The `timer` module is a copy-paste from the code of `futures-timer`, but
2// adjusted for WASM.
3// 
4// Copyright (c) 2014 Alex Crichton
5// 
6// Permission is hereby granted, free of charge, to any
7// person obtaining a copy of this software and associated
8// documentation files (the "Software"), to deal in the
9// Software without restriction, including without
10// limitation the rights to use, copy, modify, merge,
11// publish, distribute, sublicense, and/or sell copies of
12// the Software, and to permit persons to whom the Software
13// is furnished to do so, subject to the following
14// conditions:
15// 
16// The above copyright notice and this permission notice
17// shall be included in all copies or substantial portions
18// of the Software.
19// 
20// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
21// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
22// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
23// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
24// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
25// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
26// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
27// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
28// DEALINGS IN THE SOFTWARE.
29// 
30                              // Apache License
31                        // Version 2.0, January 2004
32                     // http://www.apache.org/licenses/
33// 
34// TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
35// 
36// 1. Definitions.
37// 
38   // "License" shall mean the terms and conditions for use, reproduction,
39   // and distribution as defined by Sections 1 through 9 of this document.
40// 
41   // "Licensor" shall mean the copyright owner or entity authorized by
42   // the copyright owner that is granting the License.
43// 
44   // "Legal Entity" shall mean the union of the acting entity and all
45   // other entities that control, are controlled by, or are under common
46   // control with that entity. For the purposes of this definition,
47   // "control" means (i) the power, direct or indirect, to cause the
48   // direction or management of such entity, whether by contract or
49   // otherwise, or (ii) ownership of fifty percent (50%) or more of the
50   // outstanding shares, or (iii) beneficial ownership of such entity.
51// 
52   // "You" (or "Your") shall mean an individual or Legal Entity
53   // exercising permissions granted by this License.
54// 
55   // "Source" form shall mean the preferred form for making modifications,
56   // including but not limited to software source code, documentation
57   // source, and configuration files.
58// 
59   // "Object" form shall mean any form resulting from mechanical
60   // transformation or translation of a Source form, including but
61   // not limited to compiled object code, generated documentation,
62   // and conversions to other media types.
63// 
64   // "Work" shall mean the work of authorship, whether in Source or
65   // Object form, made available under the License, as indicated by a
66   // copyright notice that is included in or attached to the work
67   // (an example is provided in the Appendix below).
68// 
69   // "Derivative Works" shall mean any work, whether in Source or Object
70   // form, that is based on (or derived from) the Work and for which the
71   // editorial revisions, annotations, elaborations, or other modifications
72   // represent, as a whole, an original work of authorship. For the purposes
73   // of this License, Derivative Works shall not include works that remain
74   // separable from, or merely link (or bind by name) to the interfaces of,
75   // the Work and Derivative Works thereof.
76// 
77   // "Contribution" shall mean any work of authorship, including
78   // the original version of the Work and any modifications or additions
79   // to that Work or Derivative Works thereof, that is intentionally
80   // submitted to Licensor for inclusion in the Work by the copyright owner
81   // or by an individual or Legal Entity authorized to submit on behalf of
82   // the copyright owner. For the purposes of this definition, "submitted"
83   // means any form of electronic, verbal, or written communication sent
84   // to the Licensor or its representatives, including but not limited to
85   // communication on electronic mailing lists, source code control systems,
86   // and issue tracking systems that are managed by, or on behalf of, the
87   // Licensor for the purpose of discussing and improving the Work, but
88   // excluding communication that is conspicuously marked or otherwise
89   // designated in writing by the copyright owner as "Not a Contribution."
90// 
91   // "Contributor" shall mean Licensor and any individual or Legal Entity
92   // on behalf of whom a Contribution has been received by Licensor and
93   // subsequently incorporated within the Work.
94// 
95// 2. Grant of Copyright License. Subject to the terms and conditions of
96   // this License, each Contributor hereby grants to You a perpetual,
97   // worldwide, non-exclusive, no-charge, royalty-free, irrevocable
98   // copyright license to reproduce, prepare Derivative Works of,
99   // publicly display, publicly perform, sublicense, and distribute the
100   // Work and such Derivative Works in Source or Object form.
101// 
102// 3. Grant of Patent License. Subject to the terms and conditions of
103   // this License, each Contributor hereby grants to You a perpetual,
104   // worldwide, non-exclusive, no-charge, royalty-free, irrevocable
105   // (except as stated in this section) patent license to make, have made,
106   // use, offer to sell, sell, import, and otherwise transfer the Work,
107   // where such license applies only to those patent claims licensable
108   // by such Contributor that are necessarily infringed by their
109   // Contribution(s) alone or by combination of their Contribution(s)
110   // with the Work to which such Contribution(s) was submitted. If You
111   // institute patent litigation against any entity (including a
112   // cross-claim or counterclaim in a lawsuit) alleging that the Work
113   // or a Contribution incorporated within the Work constitutes direct
114   // or contributory patent infringement, then any patent licenses
115   // granted to You under this License for that Work shall terminate
116   // as of the date such litigation is filed.
117// 
118// 4. Redistribution. You may reproduce and distribute copies of the
119   // Work or Derivative Works thereof in any medium, with or without
120   // modifications, and in Source or Object form, provided that You
121   // meet the following conditions:
122// 
123   // (a) You must give any other recipients of the Work or
124       // Derivative Works a copy of this License; and
125// 
126   // (b) You must cause any modified files to carry prominent notices
127       // stating that You changed the files; and
128// 
129   // (c) You must retain, in the Source form of any Derivative Works
130       // that You distribute, all copyright, patent, trademark, and
131       // attribution notices from the Source form of the Work,
132       // excluding those notices that do not pertain to any part of
133       // the Derivative Works; and
134// 
135   // (d) If the Work includes a "NOTICE" text file as part of its
136       // distribution, then any Derivative Works that You distribute must
137       // include a readable copy of the attribution notices contained
138       // within such NOTICE file, excluding those notices that do not
139       // pertain to any part of the Derivative Works, in at least one
140       // of the following places: within a NOTICE text file distributed
141       // as part of the Derivative Works; within the Source form or
142       // documentation, if provided along with the Derivative Works; or,
143       // within a display generated by the Derivative Works, if and
144       // wherever such third-party notices normally appear. The contents
145       // of the NOTICE file are for informational purposes only and
146       // do not modify the License. You may add Your own attribution
147       // notices within Derivative Works that You distribute, alongside
148       // or as an addendum to the NOTICE text from the Work, provided
149       // that such additional attribution notices cannot be construed
150       // as modifying the License.
151// 
152   // You may add Your own copyright statement to Your modifications and
153   // may provide additional or different license terms and conditions
154   // for use, reproduction, or distribution of Your modifications, or
155   // for any such Derivative Works as a whole, provided Your use,
156   // reproduction, and distribution of the Work otherwise complies with
157   // the conditions stated in this License.
158// 
159// 5. Submission of Contributions. Unless You explicitly state otherwise,
160   // any Contribution intentionally submitted for inclusion in the Work
161   // by You to the Licensor shall be under the terms and conditions of
162   // this License, without any additional terms or conditions.
163   // Notwithstanding the above, nothing herein shall supersede or modify
164   // the terms of any separate license agreement you may have executed
165   // with Licensor regarding such Contributions.
166// 
167// 6. Trademarks. This License does not grant permission to use the trade
168   // names, trademarks, service marks, or product names of the Licensor,
169   // except as required for reasonable and customary use in describing the
170   // origin of the Work and reproducing the content of the NOTICE file.
171// 
172// 7. Disclaimer of Warranty. Unless required by applicable law or
173   // agreed to in writing, Licensor provides the Work (and each
174   // Contributor provides its Contributions) on an "AS IS" BASIS,
175   // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
176   // implied, including, without limitation, any warranties or conditions
177   // of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
178   // PARTICULAR PURPOSE. You are solely responsible for determining the
179   // appropriateness of using or redistributing the Work and assume any
180   // risks associated with Your exercise of permissions under this License.
181// 
182// 8. Limitation of Liability. In no event and under no legal theory,
183   // whether in tort (including negligence), contract, or otherwise,
184   // unless required by applicable law (such as deliberate and grossly
185   // negligent acts) or agreed to in writing, shall any Contributor be
186   // liable to You for damages, including any direct, indirect, special,
187   // incidental, or consequential damages of any character arising as a
188   // result of this License or out of the use or inability to use the
189   // Work (including but not limited to damages for loss of goodwill,
190   // work stoppage, computer failure or malfunction, or any and all
191   // other commercial damages or losses), even if such Contributor
192   // has been advised of the possibility of such damages.
193// 
194// 9. Accepting Warranty or Additional Liability. While redistributing
195   // the Work or Derivative Works thereof, You may choose to offer,
196   // and charge a fee for, acceptance of support, warranty, indemnity,
197   // or other liability obligations and/or rights consistent with this
198   // License. However, in accepting such obligations, You may act only
199   // on Your own behalf and on Your sole responsibility, not on behalf
200   // of any other Contributor, and only if You agree to indemnify,
201   // defend, and hold each Contributor harmless for any liability
202   // incurred by, or claims asserted against, such Contributor by reason
203   // of your accepting any such warranty or additional liability.
204// 
205// END OF TERMS AND CONDITIONS
206// 
207// APPENDIX: How to apply the Apache License to your work.
208// 
209   // To apply the Apache License to your work, attach the following
210   // boilerplate notice, with the fields enclosed by brackets "[]"
211   // replaced with your own identifying information. (Don't include
212   // the brackets!)  The text should be enclosed in the appropriate
213   // comment syntax for the file format. We also recommend that a
214   // file or class name and description of purpose be included on the
215   // same "printed page" as the copyright notice for easier
216   // identification within third-party archives.
217// 
218// Copyright [yyyy] [name of copyright owner]
219// 
220// Licensed under the Apache License, Version 2.0 (the "License");
221// you may not use this file except in compliance with the License.
222// You may obtain a copy of the License at
223// 
224	// http://www.apache.org/licenses/LICENSE-2.0
225// 
226// Unless required by applicable law or agreed to in writing, software
227// distributed under the License is distributed on an "AS IS" BASIS,
228// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
229// See the License for the specific language governing permissions and
230// limitations under the License.
231
232use crate::Instant;
233use std::cmp::Ordering;
234use std::mem;
235use std::pin::Pin;
236use std::sync::atomic::AtomicUsize;
237use std::sync::atomic::Ordering::SeqCst;
238use std::sync::{Arc, Mutex, Weak};
239use std::task::{Context, Poll};
240use std::fmt;
241
242use futures::prelude::*;
243use futures::task::AtomicWaker;
244
245use arc_list::{ArcList, Node};
246use heap::{Heap, Slot};
247
248mod arc_list;
249mod global;
250mod heap;
251
252pub mod ext;
253pub use ext::{TryFutureExt, TryStreamExt};
254
255/// A "timer heap" used to power separately owned instances of `Delay` and
256/// `Interval`.
257///
258/// This timer is implemented as a priority queued-based heap. Each `Timer`
259/// contains a few primary methods which which to drive it:
260///
261/// * `next_wake` indicates how long the ambient system needs to sleep until it
262///   invokes further processing on a `Timer`
263/// * `advance_to` is what actually fires timers on the `Timer`, and should be
264///   called essentially every iteration of the event loop, or when the time
265///   specified by `next_wake` has elapsed.
266/// * The `Future` implementation for `Timer` is used to process incoming timer
267///   updates and requests. This is used to schedule new timeouts, update
268///   existing ones, or delete existing timeouts. The `Future` implementation
269///   will never resolve, but it'll schedule notifications of when to wake up
270///   and process more messages.
271///
272/// Note that if you're using this crate you probably don't need to use a
273/// `Timer` as there is a global one already available for you run on a helper
274/// thread. If this isn't desirable, though, then the
275/// `TimerHandle::set_fallback` method can be used instead!
276pub struct Timer {
277    inner: Arc<Inner>,
278    timer_heap: Heap<HeapTimer>,
279}
280
281/// A handle to a `Timer` which is used to create instances of a `Delay`.
282#[derive(Clone)]
283pub struct TimerHandle {
284    inner: Weak<Inner>,
285}
286
287mod delay;
288mod interval;
289pub use self::delay::Delay;
290pub use self::interval::Interval;
291
292struct Inner {
293    /// List of updates the `Timer` needs to process
294    list: ArcList<ScheduledTimer>,
295
296    /// The blocked `Timer` task to receive notifications to the `list` above.
297    waker: AtomicWaker,
298}
299
300/// Shared state between the `Timer` and a `Delay`.
301struct ScheduledTimer {
302    waker: AtomicWaker,
303
304    // The lowest bit here is whether the timer has fired or not, the second
305    // lowest bit is whether the timer has been invalidated, and all the other
306    // bits are the "generation" of the timer which is reset during the `reset`
307    // function. Only timers for a matching generation are fired.
308    state: AtomicUsize,
309
310    inner: Weak<Inner>,
311    at: Mutex<Option<Instant>>,
312
313    // TODO: this is only accessed by the timer thread, should have a more
314    // lightweight protection than a `Mutex`
315    slot: Mutex<Option<Slot>>,
316}
317
318/// Entries in the timer heap, sorted by the instant they're firing at and then
319/// also containing some payload data.
320struct HeapTimer {
321    at: Instant,
322    gen: usize,
323    node: Arc<Node<ScheduledTimer>>,
324}
325
326impl Timer {
327    /// Creates a new timer heap ready to create new timers.
328    pub fn new() -> Timer {
329        Timer {
330            inner: Arc::new(Inner {
331                list: ArcList::new(),
332                waker: AtomicWaker::new(),
333            }),
334            timer_heap: Heap::new(),
335        }
336    }
337
338    /// Returns a handle to this timer heap, used to create new timeouts.
339    pub fn handle(&self) -> TimerHandle {
340        TimerHandle {
341            inner: Arc::downgrade(&self.inner),
342        }
343    }
344
345    /// Returns the time at which this timer next needs to be invoked with
346    /// `advance_to`.
347    ///
348    /// Event loops or threads typically want to sleep until the specified
349    /// instant.
350    pub fn next_event(&self) -> Option<Instant> {
351        self.timer_heap.peek().map(|t| t.at)
352    }
353
354    /// Proces any timers which are supposed to fire at or before the current
355    /// instant.
356    ///
357    /// This method is equivalent to `self.advance_to(Instant::now())`.
358    pub fn advance(&mut self) {
359        self.advance_to(Instant::now())
360    }
361
362    /// Proces any timers which are supposed to fire before `now` specified.
363    ///
364    /// This method should be called on `Timer` periodically to advance the
365    /// internal state and process any pending timers which need to fire.
366    pub fn advance_to(&mut self, now: Instant) {
367        loop {
368            match self.timer_heap.peek() {
369                Some(head) if head.at <= now => {}
370                Some(_) => break,
371                None => break,
372            };
373
374            // Flag the timer as fired and then notify its task, if any, that's
375            // blocked.
376            let heap_timer = self.timer_heap.pop().unwrap();
377            *heap_timer.node.slot.lock().unwrap() = None;
378            let bits = heap_timer.gen << 2;
379            match heap_timer
380                .node
381                .state
382                .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst)
383            {
384                Ok(_) => heap_timer.node.waker.wake(),
385                Err(_b) => {}
386            }
387        }
388    }
389
390    /// Either updates the timer at slot `idx` to fire at `at`, or adds a new
391    /// timer at `idx` and sets it to fire at `at`.
392    fn update_or_add(&mut self, at: Instant, node: Arc<Node<ScheduledTimer>>) {
393        // TODO: avoid remove + push and instead just do one sift of the heap?
394        // In theory we could update it in place and then do the percolation
395        // as necessary
396        let gen = node.state.load(SeqCst) >> 2;
397        let mut slot = node.slot.lock().unwrap();
398        if let Some(heap_slot) = slot.take() {
399            self.timer_heap.remove(heap_slot);
400        }
401        *slot = Some(self.timer_heap.push(HeapTimer {
402            at: at,
403            gen: gen,
404            node: node.clone(),
405        }));
406    }
407
408    fn remove(&mut self, node: Arc<Node<ScheduledTimer>>) {
409        // If this `idx` is still around and it's still got a registered timer,
410        // then we jettison it form the timer heap.
411        let mut slot = node.slot.lock().unwrap();
412        let heap_slot = match slot.take() {
413            Some(slot) => slot,
414            None => return,
415        };
416        self.timer_heap.remove(heap_slot);
417    }
418
419    fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
420        node.state.fetch_or(0b10, SeqCst);
421        node.waker.wake();
422    }
423}
424
425impl Future for Timer {
426    type Output = ();
427
428    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
429        Pin::new(&mut self.inner).waker.register(cx.waker());
430        let mut list = self.inner.list.take();
431        while let Some(node) = list.pop() {
432            let at = *node.at.lock().unwrap();
433            match at {
434                Some(at) => self.update_or_add(at, node),
435                None => self.remove(node),
436            }
437        }
438        Poll::Pending
439    }
440}
441
442impl Drop for Timer {
443    fn drop(&mut self) {
444        // Seal off our list to prevent any more updates from getting pushed on.
445        // Any timer which sees an error from the push will immediately become
446        // inert.
447        let mut list = self.inner.list.take_and_seal();
448
449        // Now that we'll never receive another timer, drain the list of all
450        // updates and also drain our heap of all active timers, invalidating
451        // everything.
452        while let Some(t) = list.pop() {
453            self.invalidate(t);
454        }
455        while let Some(t) = self.timer_heap.pop() {
456            self.invalidate(t.node);
457        }
458    }
459}
460
461impl fmt::Debug for Timer {
462    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
463        f.debug_struct("Timer").field("heap", &"...").finish()
464    }
465}
466
467impl PartialEq for HeapTimer {
468    fn eq(&self, other: &HeapTimer) -> bool {
469        self.at == other.at
470    }
471}
472
473impl Eq for HeapTimer {}
474
475impl PartialOrd for HeapTimer {
476    fn partial_cmp(&self, other: &HeapTimer) -> Option<Ordering> {
477        Some(self.cmp(other))
478    }
479}
480
481impl Ord for HeapTimer {
482    fn cmp(&self, other: &HeapTimer) -> Ordering {
483        self.at.cmp(&other.at)
484    }
485}
486
487static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0);
488
489/// Error returned from `TimerHandle::set_fallback`.
490#[derive(Clone, Debug)]
491pub struct SetDefaultError(());
492
493impl TimerHandle {
494    /// Configures this timer handle to be the one returned by
495    /// `TimerHandle::default`.
496    ///
497    /// By default a global thread is initialized on the first call to
498    /// `TimerHandle::default`. This first call can happen transitively through
499    /// `Delay::new`. If, however, that hasn't happened yet then the global
500    /// default timer handle can be configured through this method.
501    ///
502    /// This method can be used to prevent the global helper thread from
503    /// spawning. If this method is successful then the global helper thread
504    /// will never get spun up.
505    ///
506    /// On success this timer handle will have installed itself globally to be
507    /// used as the return value for `TimerHandle::default` unless otherwise
508    /// specified.
509    ///
510    /// # Errors
511    ///
512    /// If another thread has already called `set_as_global_fallback` or this
513    /// thread otherwise loses a race to call this method then it will fail
514    /// returning an error. Once a call to `set_as_global_fallback` is
515    /// successful then no future calls may succeed.
516    pub fn set_as_global_fallback(self) -> Result<(), SetDefaultError> {
517        unsafe {
518            let val = self.into_usize();
519            match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
520                Ok(_) => Ok(()),
521                Err(_) => {
522                    drop(TimerHandle::from_usize(val));
523                    Err(SetDefaultError(()))
524                }
525            }
526        }
527    }
528
529    fn into_usize(self) -> usize {
530        unsafe { mem::transmute::<Weak<Inner>, usize>(self.inner) }
531    }
532
533    unsafe fn from_usize(val: usize) -> TimerHandle {
534        let inner = mem::transmute::<usize, Weak<Inner>>(val);
535        TimerHandle { inner }
536    }
537}
538
539impl Default for TimerHandle {
540    #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
541    fn default() -> TimerHandle {
542        let mut fallback = HANDLE_FALLBACK.load(SeqCst);
543
544        // If the fallback hasn't been previously initialized then let's spin
545        // up a helper thread and try to initialize with that. If we can't
546        // actually create a helper thread then we'll just return a "defunkt"
547        // handle which will return errors when timer objects are attempted to
548        // be associated.
549        if fallback == 0 {
550            let helper = match global::HelperThread::new() {
551                Ok(helper) => helper,
552                Err(_) => return TimerHandle { inner: Weak::new() },
553            };
554
555            // If we successfully set ourselves as the actual fallback then we
556            // want to `forget` the helper thread to ensure that it persists
557            // globally. If we fail to set ourselves as the fallback that means
558            // that someone was racing with this call to
559            // `TimerHandle::default`.  They ended up winning so we'll destroy
560            // our helper thread (which shuts down the thread) and reload the
561            // fallback.
562            if helper.handle().set_as_global_fallback().is_ok() {
563                let ret = helper.handle();
564                helper.forget();
565                return ret;
566            }
567            fallback = HANDLE_FALLBACK.load(SeqCst);
568        }
569
570        // At this point our fallback handle global was configured so we use
571        // its value to reify a handle, clone it, and then forget our reified
572        // handle as we don't actually have an owning reference to it.
573        assert!(fallback != 0);
574        unsafe {
575            let handle = TimerHandle::from_usize(fallback);
576            let ret = handle.clone();
577            drop(handle.into_usize());
578            return ret;
579        }
580    }
581
582    #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
583    fn default() -> TimerHandle {
584        let mut fallback = HANDLE_FALLBACK.load(SeqCst);
585
586        // If the fallback hasn't been previously initialized then let's spin
587        // up a helper thread and try to initialize with that. If we can't
588        // actually create a helper thread then we'll just return a "defunkt"
589        // handle which will return errors when timer objects are attempted to
590        // be associated.
591        if fallback == 0 {
592            let handle = global::run();
593
594            // If we successfully set ourselves as the actual fallback then we
595            // want to `forget` the helper thread to ensure that it persists
596            // globally. If we fail to set ourselves as the fallback that means
597            // that someone was racing with this call to
598            // `TimerHandle::default`.  They ended up winning so we'll destroy
599            // our helper thread (which shuts down the thread) and reload the
600            // fallback.
601            if handle.clone().set_as_global_fallback().is_ok() {
602                return handle;
603            }
604            fallback = HANDLE_FALLBACK.load(SeqCst);
605        }
606
607        // At this point our fallback handle global was configured so we use
608        // its value to reify a handle, clone it, and then forget our reified
609        // handle as we don't actually have an owning reference to it.
610        assert!(fallback != 0);
611        unsafe {
612            let handle = TimerHandle::from_usize(fallback);
613            let ret = handle.clone();
614            drop(handle.into_usize());
615            return ret;
616        }
617    }
618}
619
620impl fmt::Debug for TimerHandle {
621    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
622        f.debug_struct("TimerHandle").field("inner", &"...").finish()
623    }
624}
625