use std::{
future::Future,
pin::{Pin, pin},
sync::{
Arc, Mutex,
atomic::{self, AtomicBool},
mpsc::{Sender, TryRecvError, channel},
},
task::{Context, Poll, Wake, Waker},
thread::{self, JoinHandle, Thread},
};
use crate::{
infrastructure::error::{DdsError, DdsResult},
runtime::Spawner,
};
pub fn block_timeout<T>(
duration: core::time::Duration,
future: impl Future<Output = T>,
) -> DdsResult<T> {
struct ChannelWake(std::sync::mpsc::SyncSender<()>);
impl Wake for ChannelWake {
fn wake(self: std::sync::Arc<Self>) {
self.wake_by_ref();
}
fn wake_by_ref(self: &Arc<Self>) {
self.0.send(()).ok();
}
}
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let waker = Waker::from(Arc::new(ChannelWake(sender)));
let mut cx = Context::from_waker(&waker);
let mut pinned_fut = pin!(future);
let start_instant = std::time::Instant::now();
loop {
match pinned_fut.as_mut().poll(&mut cx) {
Poll::Ready(t) => return Ok(t),
Poll::Pending => {
if let Some(timeout) =
duration.checked_sub(std::time::Instant::now().duration_since(start_instant))
{
match receiver.recv_timeout(timeout) {
Ok(_) => (),
Err(_) => return Err(DdsError::Timeout),
}
} else {
return Err(DdsError::Timeout);
}
}
}
}
}
pub fn block_on<T>(f: impl Future<Output = T>) -> T {
struct ThreadWake(Thread);
impl Wake for ThreadWake {
fn wake(self: std::sync::Arc<Self>) {
self.wake_by_ref()
}
fn wake_by_ref(self: &Arc<Self>) {
self.0.unpark()
}
}
let waker = Waker::from(Arc::new(ThreadWake(thread::current())));
let mut cx = Context::from_waker(&waker);
let mut pinned_fut = pin!(f);
loop {
match pinned_fut.as_mut().poll(&mut cx) {
Poll::Ready(t) => return t,
Poll::Pending => thread::park(),
}
}
}
pub struct Task {
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
task_sender: Sender<Arc<Task>>,
thread_handle: Thread,
finished: AtomicBool,
}
impl Task {
fn is_finished(&self) -> bool {
self.finished.load(atomic::Ordering::Acquire)
}
}
impl Wake for Task {
fn wake(self: Arc<Self>) {
self.wake_by_ref()
}
fn wake_by_ref(self: &Arc<Self>) {
if !self.is_finished() {
self.task_sender.send(self.clone()).unwrap();
self.thread_handle.unpark();
}
}
}
#[derive(Clone)]
pub struct ExecutorHandle {
task_sender: Sender<Arc<Task>>,
thread_handle: Thread,
}
impl ExecutorHandle {
pub fn spawn(&self, f: impl Future<Output = ()> + Send + 'static) {
let future = Box::pin(f);
let task = Arc::new(Task {
future: Mutex::new(future),
task_sender: self.task_sender.clone(),
thread_handle: self.thread_handle.clone(),
finished: AtomicBool::new(false),
});
self.task_sender
.send(task.clone())
.expect("Should never fail to send");
self.thread_handle.unpark();
}
}
impl Spawner for ExecutorHandle {
fn spawn(&self, f: impl Future<Output = ()> + Send + 'static) {
self.spawn(f);
}
}
pub struct Executor {
task_sender: Sender<Arc<Task>>,
executor_thread_handle: JoinHandle<()>,
}
impl Default for Executor {
fn default() -> Self {
Self::new()
}
}
impl Executor {
pub fn new() -> Self {
let (task_sender, task_receiver) = channel::<Arc<Task>>();
let executor_thread_handle = std::thread::Builder::new()
.name("Dust DDS Executor".to_string())
.spawn(move || {
loop {
match task_receiver.try_recv() {
Ok(task) => {
if !task.is_finished() {
let waker = Waker::from(task.clone());
let mut cx = Context::from_waker(&waker);
let poll_result = task
.future
.try_lock()
.expect("Only ever locked here")
.as_mut()
.poll(&mut cx);
if matches!(poll_result, Poll::Ready(_)) {
task.finished.store(true, atomic::Ordering::Relaxed);
}
}
}
Err(TryRecvError::Empty) => thread::park(),
Err(TryRecvError::Disconnected) => break,
}
}
})
.expect("failed to spawn thread");
Self {
task_sender,
executor_thread_handle,
}
}
pub fn handle(&self) -> ExecutorHandle {
ExecutorHandle {
task_sender: self.task_sender.clone(),
thread_handle: self.executor_thread_handle.thread().clone(),
}
}
}