#![allow(dead_code)]
use futures::{task::AtomicWaker, Stream};
use instant::Duration;
use std::{
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
task::{Context, Poll},
};
use wasm_bindgen::prelude::*;
use super::overrides::init_timer_overrides;
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen (catch, js_name = setInterval)]
pub fn set_interval(
closure: &Closure<dyn FnMut()>,
timeout: u32,
) -> std::result::Result<JsValue, JsValue>;
#[wasm_bindgen (catch, js_name = clearInterval)]
pub fn clear_interval(interval: &JsValue) -> std::result::Result<(), JsValue>;
}
type IntervalClosure = Closure<dyn FnMut()>;
struct IntervalContext {
period: Duration,
instance: JsValue,
#[allow(dead_code)]
closure: IntervalClosure,
}
unsafe impl Sync for IntervalContext {}
unsafe impl Send for IntervalContext {}
struct Inner {
ready: AtomicBool,
waker: AtomicWaker,
ctx: Mutex<Option<IntervalContext>>,
}
#[derive(Clone)]
pub struct Interval {
inner: Arc<Inner>,
}
impl Interval {
pub fn new(period: Duration) -> Self {
if let Err(e) = init_timer_overrides() {
workflow_log::log_error!("{e}");
}
let inner = Arc::new(Inner {
ready: AtomicBool::new(true),
ctx: Mutex::new(None),
waker: AtomicWaker::new(),
});
let inner_ = inner.clone();
let closure = Closure::new(move || {
inner_.ready.store(true, Ordering::SeqCst);
if let Some(waker) = inner_.waker.take() {
waker.wake();
}
});
let instance = set_interval(&closure, period.as_millis() as u32).unwrap();
inner.ctx.lock().unwrap().replace(IntervalContext {
period,
instance,
closure,
});
Interval { inner }
}
#[inline]
pub fn period(&self) -> Duration {
self.inner.ctx.lock().unwrap().as_ref().unwrap().period
}
#[inline]
pub fn change_period(&self, period: Duration) {
if let Some(ctx) = self.inner.ctx.lock().unwrap().as_mut() {
clear_interval(ctx.instance.as_ref()).unwrap();
let instance = set_interval(&ctx.closure, period.as_millis() as u32).unwrap();
ctx.instance = instance;
}
}
#[inline]
fn clear(&self) {
if let Some(ctx) = self.inner.ctx.lock().unwrap().take() {
clear_interval(ctx.instance.as_ref()).unwrap();
}
}
pub fn cancel(&self) {
self.clear();
}
}
impl Stream for Interval {
type Item = ();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.inner.ready.load(Ordering::SeqCst) {
true => {
self.inner.ready.store(false, Ordering::SeqCst);
Poll::Ready(Some(()))
}
false => {
self.inner.waker.register(cx.waker());
if self.inner.ready.load(Ordering::SeqCst) {
self.inner.ready.store(false, Ordering::SeqCst);
Poll::Ready(Some(()))
} else {
Poll::Pending
}
}
}
}
}
impl Drop for Interval {
fn drop(&mut self) {
self.clear();
}
}
pub fn interval(duration: Duration) -> Interval {
Interval::new(duration)
}