use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime};
use super::{BoxFuture, Runtime, SpawnHandle, SpawnHandleInner};
#[derive(Clone, Copy, Default)]
pub struct EmbeddedRuntime;
impl Runtime for EmbeddedRuntime {
fn spawn(&self, fut: BoxFuture<()>) -> SpawnHandle {
let aborted = Arc::new(std::sync::atomic::AtomicBool::new(false));
let finished = Arc::new(std::sync::atomic::AtomicBool::new(false));
let aborted_for_task = Arc::clone(&aborted);
let finished_for_task = Arc::clone(&finished);
let handle = thread::spawn(move || {
let wrapped = AbortableFuture {
inner: fut,
aborted: aborted_for_task,
};
futures::executor::block_on(wrapped);
finished_for_task.store(true, std::sync::atomic::Ordering::SeqCst);
});
SpawnHandle::from_inner(EmbeddedSpawnHandle {
handle: Some(handle),
aborted,
finished,
})
}
fn sleep(&self, duration: Duration) -> BoxFuture<()> {
Box::pin(SleepFuture::new(duration))
}
fn now_monotonic(&self) -> Instant {
Instant::now()
}
fn now_wall_clock(&self) -> SystemTime {
SystemTime::now()
}
}
pub(super) struct EmbeddedSpawnHandle {
handle: Option<JoinHandle<()>>,
aborted: Arc<std::sync::atomic::AtomicBool>,
finished: Arc<std::sync::atomic::AtomicBool>,
}
impl SpawnHandleInner for EmbeddedSpawnHandle {
fn abort(&self) {
self.aborted
.store(true, std::sync::atomic::Ordering::SeqCst);
}
fn is_finished(&self) -> bool {
self.finished.load(std::sync::atomic::Ordering::SeqCst)
|| self
.handle
.as_ref()
.map(|h| h.is_finished())
.unwrap_or(false)
}
}
struct AbortableFuture {
inner: BoxFuture<()>,
aborted: Arc<std::sync::atomic::AtomicBool>,
}
impl Future for AbortableFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.aborted.load(std::sync::atomic::Ordering::SeqCst) {
return Poll::Ready(());
}
self.inner.as_mut().poll(cx)
}
}
struct SleepFuture {
deadline: Instant,
inner: Arc<Mutex<SleepInner>>,
}
struct SleepInner {
waker: Option<Waker>,
waiter_spawned: bool,
}
impl SleepFuture {
fn new(duration: Duration) -> Self {
Self {
deadline: Instant::now() + duration,
inner: Arc::new(Mutex::new(SleepInner {
waker: None,
waiter_spawned: false,
})),
}
}
}
impl Future for SleepFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.deadline {
return Poll::Ready(());
}
#[allow(clippy::expect_used)]
let mut inner = self.inner.lock().expect("SleepFuture mutex poisoned");
inner.waker = Some(cx.waker().clone());
if !inner.waiter_spawned {
inner.waiter_spawned = true;
let deadline = self.deadline;
let inner_for_parker = Arc::clone(&self.inner);
thread::spawn(move || {
let now = Instant::now();
if deadline > now {
thread::sleep(deadline - now);
}
if let Ok(guard) = inner_for_parker.lock() {
if let Some(w) = guard.waker.as_ref() {
w.wake_by_ref();
}
}
});
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
#[test]
fn spawn_and_sleep_round_trip() {
let rt: Arc<dyn Runtime> = Arc::new(EmbeddedRuntime);
let counter = Arc::new(AtomicU32::new(0));
let c = counter.clone();
let rt_for_task = rt.clone();
let handle = rt.spawn(Box::pin(async move {
rt_for_task.sleep(Duration::from_millis(10)).await;
c.fetch_add(1, Ordering::SeqCst);
}));
futures::executor::block_on(rt.sleep(Duration::from_millis(100)));
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert!(handle.is_finished());
}
#[test]
fn abort_cancels_task() {
let rt: Arc<dyn Runtime> = Arc::new(EmbeddedRuntime);
let counter = Arc::new(AtomicU32::new(0));
let c = counter.clone();
let rt_for_task = rt.clone();
let handle = rt.spawn(Box::pin(async move {
rt_for_task.sleep(Duration::from_secs(60)).await;
c.fetch_add(1, Ordering::SeqCst);
}));
futures::executor::block_on(rt.sleep(Duration::from_millis(50)));
handle.abort();
futures::executor::block_on(rt.sleep(Duration::from_millis(50)));
assert_eq!(counter.load(Ordering::SeqCst), 0);
}
#[test]
fn monotonic_clock_does_not_go_backwards() {
let rt = EmbeddedRuntime;
let a = rt.now_monotonic();
for _ in 0..1000 {
std::hint::black_box(a);
}
let b = rt.now_monotonic();
assert!(b >= a, "monotonic clock went backwards: {:?} → {:?}", a, b);
}
#[test]
fn wall_clock_is_after_unix_epoch() {
let rt = EmbeddedRuntime;
let now = rt.now_wall_clock();
assert!(now > SystemTime::UNIX_EPOCH);
}
#[test]
fn embedded_runtime_is_object_safe() {
fn assert_runtime_obj_safe(_: &dyn Runtime) {}
let rt = EmbeddedRuntime;
assert_runtime_obj_safe(&rt);
}
}