workflow_core/native/
interval.rs1#![allow(dead_code)]
6
7use crate::channel::Channel;
8use futures::{task::AtomicWaker, Stream};
9use instant::Duration;
10use std::{
11 pin::Pin,
12 sync::{
13 atomic::{AtomicBool, Ordering},
14 Arc, Mutex,
15 },
16 task::{Context, Poll},
17};
18
19struct Inner {
20 ready: AtomicBool,
21 period: Mutex<Duration>,
22 waker: AtomicWaker,
23 shutdown_ctl: Channel,
24 period_ctl: Channel<Duration>,
25}
26
27#[derive(Clone)]
39pub struct Interval {
40 inner: Arc<Inner>,
41}
42
43impl Interval {
44 pub fn new(duration: Duration) -> Self {
46 let inner = Arc::new(Inner {
47 ready: AtomicBool::new(false),
48 waker: AtomicWaker::new(),
49 period: Mutex::new(duration),
50 shutdown_ctl: Channel::oneshot(),
51 period_ctl: Channel::unbounded(),
52 });
53
54 let inner_ = inner.clone();
55
56 crate::task::spawn(async move {
57 let mut current_period = *inner_.period.lock().unwrap();
58
59 'outer: loop {
60 let mut interval = tokio::time::interval(current_period);
61
62 'inner: loop {
63 tokio::select! {
64 _ = interval.tick() => {
65 inner_.ready.store(true, Ordering::SeqCst);
66 inner_.waker.wake();
67 },
68 new_period = inner_.period_ctl.recv() => {
69 if let Ok(new_period) = new_period {
70 current_period = new_period;
71 } else {
72 break 'outer;
74 }
75 break 'inner;
76 },
77 _ = inner_.shutdown_ctl.recv() => {
78 break 'outer;
79 },
80 }
81 }
82 }
83 });
84
85 Interval { inner }
86 }
87
88 #[inline]
89 fn change_period(&self, period: Duration) {
90 self.inner
91 .period_ctl
92 .try_send(period)
93 .expect("Interval::change_period() unable to send period signal");
94 }
95
96 #[inline]
97 fn shutdown(&self) {
98 self.inner
99 .shutdown_ctl
100 .try_send(())
101 .expect("Interval::shutdown() unable to send shutdown signal");
102 }
103
104 pub fn cancel(&self) {
106 self.shutdown();
107 }
108}
109
110impl Stream for Interval {
111 type Item = ();
112
113 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
114 match self.inner.ready.load(Ordering::SeqCst) {
115 true => {
116 self.inner.ready.store(false, Ordering::SeqCst);
117 Poll::Ready(Some(()))
118 }
119 false => {
120 self.inner.waker.register(cx.waker());
121 if self.inner.ready.load(Ordering::SeqCst) {
122 self.inner.ready.store(false, Ordering::SeqCst);
123 Poll::Ready(Some(()))
124 } else {
125 Poll::Pending
126 }
127 }
128 }
129 }
130}
131
132impl Drop for Interval {
133 fn drop(&mut self) {
134 self.shutdown();
135 }
136}
137
138pub fn interval(duration: Duration) -> Interval {
140 Interval::new(duration)
141}