1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
//!
//! `Sleep` future and an `async sleep()` function backed by
//! the JavaScript `setTimeout()` and `clearTimeout()` APIs.
//!

#![allow(dead_code)]

use futures::future::FusedFuture;
use futures::task::AtomicWaker;
use instant::Duration;
use std::future::Future;
use std::{
    pin::Pin,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc, Mutex,
    },
    task::{Context, Poll},
};
use wasm_bindgen::prelude::*;

use super::overrides::init_timer_overrides;

#[wasm_bindgen]
extern "C" {
    #[wasm_bindgen (catch, js_name = setTimeout)]
    pub fn set_timeout(
        closure: &Closure<dyn FnMut()>,
        timeout: u32,
    ) -> std::result::Result<JsValue, JsValue>;
    #[wasm_bindgen (catch, js_name = clearTimeout)]
    pub fn clear_timeout(interval: &JsValue) -> std::result::Result<(), JsValue>;
}

type SleepClosure = Closure<dyn FnMut()>;

struct SleepContext {
    instance: JsValue,
    // this closue, while not read
    // must be retained for the lifetime
    // of this context.
    #[allow(dead_code)]
    closure: SleepClosure,
}

unsafe impl Sync for SleepContext {}
unsafe impl Send for SleepContext {}

struct Inner {
    ready: AtomicBool,
    consumed: AtomicBool,
    waker: AtomicWaker,
    ctx: Mutex<Option<SleepContext>>,
}

/// `Sleep` future used by the `sleep()` function to provide a
/// timeout future that is backed by the JavaScript `createTimeout()`
/// and `clearTimeout()` APIs. The `Sleep` future is meant only for
/// use in WASM32 browser environments. It has an advantage of having
/// `Send` and `Sync` markers.
#[derive(Clone)]
pub struct Sleep {
    inner: Arc<Inner>,
}

impl Sleep {
    /// Create a new `Sleep` future that will resolve after the given duration.
    pub fn new(duration: Duration) -> Self {
        if let Err(e) = init_timer_overrides() {
            workflow_log::log_error!("{e}");
        }
        let inner = Arc::new(Inner {
            ready: AtomicBool::new(false),
            consumed: AtomicBool::new(false),
            waker: AtomicWaker::new(),
            ctx: Mutex::new(None),
        });

        let inner_ = inner.clone();
        let closure = Closure::new(move || {
            inner_.ready.store(true, Ordering::SeqCst);
            if let Some(waker) = inner_.waker.take() {
                waker.wake();
            }
        });

        let instance = set_timeout(&closure, duration.as_millis() as u32).unwrap();

        inner
            .ctx
            .lock()
            .unwrap()
            .replace(SleepContext { instance, closure });

        Sleep { inner }
    }

    #[inline]
    fn clear(&self) {
        if let Some(ctx) = self.inner.ctx.lock().unwrap().take() {
            clear_timeout(ctx.instance.as_ref()).unwrap();
        }
    }

    /// Cancel the current timeout.
    pub fn cancel(&self) {
        self.clear();
    }
}

impl Future for Sleep {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.inner.ready.load(Ordering::SeqCst) {
            true => {
                self.inner.consumed.store(true, Ordering::SeqCst);
                self.inner.ctx.lock().unwrap().take();
                Poll::Ready(())
            }
            false => {
                self.inner.waker.register(cx.waker());
                if self.inner.ready.load(Ordering::SeqCst) {
                    Poll::Ready(())
                } else {
                    Poll::Pending
                }
            }
        }
    }
}

impl Drop for Sleep {
    fn drop(&mut self) {
        self.clear();
    }
}

/// `async sleep()` function backed by the JavaScript `createTimeout()`
pub fn sleep(duration: Duration) -> Sleep {
    Sleep::new(duration)
}

impl FusedFuture for Sleep {
    fn is_terminated(&self) -> bool {
        self.inner.consumed.load(Ordering::SeqCst)
    }
}