workflow_core/native/
interval.rs

1//!
2//! `Interval` stream backed by the Tokio `Interval` stream.
3//!
4
5#![allow(dead_code)]
6
7use crate::channel::Channel;
8use futures::{task::AtomicWaker, Stream};
9use instant::Duration;
10use std::{
11    pin::Pin,
12    sync::{
13        atomic::{AtomicBool, Ordering},
14        Arc, Mutex,
15    },
16    task::{Context, Poll},
17};
18
19struct Inner {
20    ready: AtomicBool,
21    period: Mutex<Duration>,
22    waker: AtomicWaker,
23    shutdown_ctl: Channel,
24    period_ctl: Channel<Duration>,
25}
26
27///
28/// `Interval` stream used by the `interval()` function to provide a
29/// a time interval stream. The stream is backed by tokio interval
30/// stream on native platforms and by by the JavaScript `setInterval()`
31/// and `clearInterval()` APIs in WASM32 environment.
32///
33/// This Interval stream has an advantage of having `Send` and `Sync` markers.
34///
35/// Please note that the `Interval` fires upon creation to mimic
36/// the tokio-backed Interval stream available on the native target.
37///
38#[derive(Clone)]
39pub struct Interval {
40    inner: Arc<Inner>,
41}
42
43impl Interval {
44    /// Create a new `Interval` stream that will resolve each given duration.
45    pub fn new(duration: Duration) -> Self {
46        let inner = Arc::new(Inner {
47            ready: AtomicBool::new(false),
48            waker: AtomicWaker::new(),
49            period: Mutex::new(duration),
50            shutdown_ctl: Channel::oneshot(),
51            period_ctl: Channel::unbounded(),
52        });
53
54        let inner_ = inner.clone();
55
56        crate::task::spawn(async move {
57            let mut current_period = *inner_.period.lock().unwrap();
58
59            'outer: loop {
60                let mut interval = tokio::time::interval(current_period);
61
62                'inner: loop {
63                    tokio::select! {
64                        _ = interval.tick() => {
65                            inner_.ready.store(true, Ordering::SeqCst);
66                            inner_.waker.wake();
67                        },
68                        new_period = inner_.period_ctl.recv() => {
69                            if let Ok(new_period) = new_period {
70                                current_period = new_period;
71                            } else {
72                                // if the duration channel is closed, we stop the interval
73                                break 'outer;
74                            }
75                            break 'inner;
76                        },
77                        _ = inner_.shutdown_ctl.recv() => {
78                            break 'outer;
79                        },
80                    }
81                }
82            }
83        });
84
85        Interval { inner }
86    }
87
88    #[inline]
89    fn change_period(&self, period: Duration) {
90        self.inner
91            .period_ctl
92            .try_send(period)
93            .expect("Interval::change_period() unable to send period signal");
94    }
95
96    #[inline]
97    fn shutdown(&self) {
98        self.inner
99            .shutdown_ctl
100            .try_send(())
101            .expect("Interval::shutdown() unable to send shutdown signal");
102    }
103
104    /// Cancel the current timeout.
105    pub fn cancel(&self) {
106        self.shutdown();
107    }
108}
109
110impl Stream for Interval {
111    type Item = ();
112
113    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
114        match self.inner.ready.load(Ordering::SeqCst) {
115            true => {
116                self.inner.ready.store(false, Ordering::SeqCst);
117                Poll::Ready(Some(()))
118            }
119            false => {
120                self.inner.waker.register(cx.waker());
121                if self.inner.ready.load(Ordering::SeqCst) {
122                    self.inner.ready.store(false, Ordering::SeqCst);
123                    Poll::Ready(Some(()))
124                } else {
125                    Poll::Pending
126                }
127            }
128        }
129    }
130}
131
132impl Drop for Interval {
133    fn drop(&mut self) {
134        self.shutdown();
135    }
136}
137
138/// `async interval()` function backed by the JavaScript `createInterval()`
139pub fn interval(duration: Duration) -> Interval {
140    Interval::new(duration)
141}