stdweb/webapi/timer_future.rs
1use std::sync::{Arc, Mutex};
2use std::pin::Pin;
3use std::future::Future;
4use std::task::{Poll, Waker, Context};
5use webcore::once::Once;
6use webcore::value::Value;
7use futures_core::stream::Stream;
8use futures_util::FutureExt;
9use futures_channel::oneshot;
10
11
12#[inline]
13fn convert_to_i32( ms: u32 ) -> i32 {
14 let ms: i32 = ms as i32;
15
16 assert!( ms >= 0, "ms must be less than 2147483648" );
17
18 ms
19}
20
21
22/// The [`Future`](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.13/futures/future/trait.Future.html)
23/// which is returned by [`wait`](fn.wait.html).
24// This isn't implemented as a PromiseFuture because Promises do not support cancellation
25#[derive( Debug )]
26pub struct Wait {
27 receiver: oneshot::Receiver< () >,
28 timer: Value,
29}
30
31impl Wait {
32 fn new( ms: u32 ) -> Self {
33 // We accept a u32 because we don't want negative values, however setTimeout requires it to be i32
34 let ms = convert_to_i32( ms );
35
36 let ( sender, receiver ) = oneshot::channel();
37
38 let callback = move || {
39 // TODO is this correct ?
40 match sender.send( () ) {
41 Ok( _ ) => {},
42 Err( _ ) => {},
43 };
44 };
45
46 let timer = js!(
47 var callback = @{Once( callback )};
48
49 return {
50 callback: callback,
51 id: setTimeout( function () {
52 callback();
53 }, @{ms} )
54 };
55 );
56
57 Self {
58 receiver,
59 timer,
60 }
61 }
62}
63
64impl Future for Wait {
65 type Output = ();
66
67 #[inline]
68 fn poll( mut self: Pin< &mut Self >, cx: &mut Context ) -> Poll< Self::Output > {
69 // TODO is this unwrap correct ?
70 self.receiver.poll_unpin( cx ).map( |x| x.unwrap() )
71 }
72}
73
74impl Drop for Wait {
75 #[inline]
76 fn drop( &mut self ) {
77 js! { @(no_return)
78 var timer = @{&self.timer};
79 clearTimeout( timer.id );
80 timer.callback.drop();
81 }
82 }
83}
84
85/// Creates a [`Future`](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.13/futures/future/trait.Future.html)
86/// which will return `()` after `ms` milliseconds have passed.
87///
88/// It might return a long time *after* `ms` milliseconds have passed, but it
89/// will never return *before* `ms` milliseconds have passed.
90///
91/// [(JavaScript docs)](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/setTimeout)
92// https://html.spec.whatwg.org/multipage/webappapis.html#dom-settimeout
93#[inline]
94pub fn wait( ms: u32 ) -> Wait {
95 Wait::new( ms )
96}
97
98
99#[derive( Debug )]
100struct IntervalBufferedState {
101 waker: Option< Waker >,
102 count: usize,
103}
104
105/// The [`Stream`](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.13/futures/stream/trait.Stream.html)
106/// which is returned by [`interval_buffered`](fn.interval_buffered.html).
107#[derive( Debug )]
108pub struct IntervalBuffered {
109 state: Arc< Mutex< IntervalBufferedState > >,
110 timer: Value,
111}
112
113impl IntervalBuffered {
114 fn new( ms: u32 ) -> Self {
115 // We accept a u32 because we don't want negative values, however setInterval requires it to be i32
116 let ms = convert_to_i32( ms );
117
118 let state = Arc::new( Mutex::new( IntervalBufferedState {
119 waker: None,
120 count: 0,
121 } ) );
122
123 let callback = {
124 let state = state.clone();
125
126 move || {
127 let mut lock = state.lock().unwrap();
128
129 lock.count += 1;
130
131 if let Some( waker ) = lock.waker.take() {
132 drop( lock );
133 waker.wake();
134 }
135 }
136 };
137
138 let timer = js!(
139 var callback = @{callback};
140
141 return {
142 callback: callback,
143 id: setInterval( function () {
144 callback();
145 }, @{ms} )
146 };
147 );
148
149 Self {
150 state,
151 timer,
152 }
153 }
154}
155
156impl Stream for IntervalBuffered {
157 type Item = ();
158
159 fn poll_next( self: Pin< &mut Self >, cx: &mut Context ) -> Poll< Option< Self::Item > > {
160 let mut lock = self.state.lock().unwrap();
161
162 if lock.count == 0 {
163 lock.waker = Some( cx.waker().clone() );
164 Poll::Pending
165
166 } else {
167 lock.count -= 1;
168
169 Poll::Ready( Some( () ) )
170 }
171 }
172}
173
174impl Drop for IntervalBuffered {
175 #[inline]
176 fn drop( &mut self ) {
177 js! { @(no_return)
178 var timer = @{&self.timer};
179 clearInterval( timer.id );
180 timer.callback.drop();
181 }
182 }
183}
184
185/// Creates a [`Stream`](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.13/futures/stream/trait.Stream.html)
186/// which will continuously output `()` every `ms` milliseconds, until it is dropped.
187///
188/// It might output `()` a long time *after* `ms` milliseconds have passed, but it
189/// will never output `()` *before* `ms` milliseconds have passed.
190///
191/// If the consumer isn't ready to receive the `()`, it will be put into a queue
192/// (this queue is ***very*** fast, it can handle a very large number of elements).
193///
194/// When the consumer is ready, it will output all of the `()` from the queue.
195///
196/// That means that if the consumer is too slow, it might receive multiple `()` at the same time.
197/// Or it might receive another `()` before `ms` milliseconds have passed for the consumer
198/// (because `ms` milliseconds *have* passed for the [`IntervalBuffered`](struct.IntervalBuffered.html)).
199///
200/// [(JavaScript docs)](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/setInterval)
201// https://html.spec.whatwg.org/multipage/webappapis.html#dom-setinterval
202#[inline]
203pub fn interval_buffered( ms: u32 ) -> IntervalBuffered {
204 IntervalBuffered::new( ms )
205}