use futures::Stream;
use rong::{function::*, *};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::{Notify, mpsc};
use tokio::time::{Interval, interval, sleep};
fn get_current_timestamp() -> f64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as f64)
.unwrap_or(0.0)
}
async fn set_timeout(ctx: JSContext, delay: Optional<f64>) -> JSResult<f64> {
let delay = delay.0.unwrap_or(0.0).max(0.0) as u64;
let notifier = Arc::new(Notify::new());
let notifier_clone = notifier.clone();
let registry = ctx
.runtime()
.get_or_init_service::<super::TimerRegistry>()
.clone();
let timer_id = registry.next_id();
registry.register_timer(timer_id, notifier);
let result = tokio::select! {
_ = sleep(Duration::from_millis(delay)) => Ok(get_current_timestamp()),
_ = notifier_clone.notified() => Ok(get_current_timestamp()),
};
registry.cancel_timer(timer_id);
result
}
async fn set_immediate(ctx: JSContext) -> JSResult<f64> {
let notifier = Arc::new(Notify::new());
let notifier_clone = notifier.clone();
let registry = ctx
.runtime()
.get_or_init_service::<super::TimerRegistry>()
.clone();
let timer_id = registry.next_id();
registry.register_timer(timer_id, notifier);
let result = tokio::select! {
_ = tokio::task::yield_now() => Ok(get_current_timestamp()),
_ = notifier_clone.notified() => Ok(get_current_timestamp()),
};
registry.cancel_timer(timer_id);
result
}
struct IntervalStream {
interval: Interval,
notify_rx: tokio::sync::mpsc::Receiver<()>,
registry: super::TimerRegistry, timer_id: u32, canceled: std::sync::Arc<AtomicBool>,
}
impl Stream for IntervalStream {
type Item = f64;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.canceled.load(Ordering::SeqCst)
|| matches!(self.notify_rx.poll_recv(cx), Poll::Ready(_))
{
return Poll::Ready(None);
}
match self.interval.poll_tick(cx) {
Poll::Ready(_) => Poll::Ready(Some(get_current_timestamp())),
Poll::Pending => Poll::Pending,
}
}
}
impl Drop for IntervalStream {
fn drop(&mut self) {
self.canceled.store(true, Ordering::SeqCst);
self.registry.cancel_timer(self.timer_id);
}
}
pub fn set_interval(ctx: JSContext, delay: Optional<f64>) -> JSResult<JSObject> {
let delay_ms = delay.0.unwrap_or(0.0);
let delay_ms = if delay_ms.is_finite() && delay_ms > 0.0 {
delay_ms
} else {
0.0
};
let (notify_tx, notify_rx) = mpsc::channel::<()>(1);
let registry = ctx
.runtime()
.get_or_init_service::<super::TimerRegistry>()
.clone();
let timer_id = registry.next_id();
let notifier = Arc::new(Notify::new());
let notifier_clone = notifier.clone();
registry.register_timer(timer_id, notifier);
let canceled = std::sync::Arc::new(AtomicBool::new(false));
let canceled_clone = canceled.clone();
spawn(async move {
notifier_clone.notified().await;
let _ = notify_tx.send(()).await;
canceled_clone.store(true, Ordering::SeqCst);
});
let stream = IntervalStream {
interval: interval(Duration::from_millis(delay_ms as u64).max(Duration::from_millis(1))),
notify_rx,
registry: registry.clone(),
timer_id,
canceled,
};
stream.to_js_async_iter(&ctx)
}
pub(crate) fn init(ctx: &JSContext) -> JSResult<()> {
let timer = JSObject::new(ctx);
timer
.set(
"setTimeout",
JSFunc::new(ctx, set_timeout)?.name("setTimeout")?,
)?
.set(
"setImmediate",
JSFunc::new(ctx, set_immediate)?.name("setImmediate")?,
)?
.set(
"setInterval",
JSFunc::new(ctx, set_interval)?.name("setInterval")?,
)?;
ctx.global().set("timers", timer)?;
Ok(())
}