use std::{
cell::RefCell,
marker::PhantomData,
pin::Pin,
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll, Waker},
};
use futures::task::{waker, ArcWake, AtomicWaker};
use parking_lot::Mutex;
use slotmap::{new_key_type, SlotMap};
use crate::{effect::Effect, Frame};
new_key_type! {struct TaskId; }
struct Task<Data> {
effect: Pin<Box<dyn Effect<Data>>>,
_marker: PhantomData<Data>,
}
impl<Data> Task<Data> {
fn new(effect: Pin<Box<dyn Effect<Data>>>) -> Self {
Self {
effect,
_marker: PhantomData,
}
}
pub fn poll(&mut self, context: &mut Context, data: &mut Data) -> Poll<()> {
self.effect.as_mut().poll(context, data)
}
}
struct TaskWaker {
id: TaskId,
shared: Arc<Shared>,
}
impl ArcWake for TaskWaker {
fn wake_by_ref(t: &Arc<Self>) {
t.shared.push_ready(t.id);
}
}
struct Shared {
woken: AtomicBool,
executor_waker: AtomicWaker,
ready: Mutex<Vec<TaskId>>,
}
impl Shared {
fn push_ready(&self, id: TaskId) {
self.ready.lock().push(id);
self.woken.store(true, Ordering::Relaxed);
self.executor_waker.wake();
}
}
pub struct Executor<Data = Frame> {
tasks: SlotMap<TaskId, (Task<Data>, Waker)>,
processing: Vec<TaskId>,
shared: Arc<Shared>,
incoming: Rc<RefCell<Vec<Task<Data>>>>,
}
pub struct Spawner<Data> {
incoming: std::rc::Weak<RefCell<Vec<Task<Data>>>>,
}
impl<Data> Spawner<Data> {
pub fn spawn(&self, effect: impl 'static + Effect<Data>) {
let incoming = self.incoming.upgrade().expect("Executor dropped");
let task = Task::new(Box::pin(effect));
incoming.borrow_mut().push(task);
}
}
impl<Data> Executor<Data> {
pub fn new() -> Self {
let shared = Arc::new(Shared {
executor_waker: AtomicWaker::new(),
ready: Default::default(),
woken: AtomicBool::new(false),
});
let incoming = Default::default();
Self {
tasks: SlotMap::with_key(),
shared,
processing: Vec::new(),
incoming,
}
}
pub fn spawner(&self) -> Spawner<Data> {
Spawner {
incoming: Rc::downgrade(&self.incoming),
}
}
pub fn poll_tick(&mut self, data: &mut Data, cx: &mut Context<'_>) -> Poll<()> {
self.shared.executor_waker.register(cx.waker());
if self
.shared
.woken
.compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
tracing::info!("Executor ready");
self.tick(data);
Poll::Ready(())
} else {
Poll::Pending
}
}
pub fn tick(&mut self, data: &mut Data) {
{
assert!(self.processing.is_empty());
core::mem::swap(&mut *self.shared.ready.lock(), &mut self.processing);
}
self.processing
.extend(self.incoming.borrow_mut().drain(..).map(|task| {
self.tasks.insert_with_key(|id| {
let waker = waker(Arc::new(TaskWaker {
id,
shared: self.shared.clone(),
}));
(task, waker)
})
}));
for id in self.processing.drain(..) {
let (task, waker) = self.tasks.get_mut(id).unwrap();
let mut context = Context::from_waker(&*waker);
tracing::debug!(?id, "Polling task");
if task.poll(&mut context, data).is_ready() {
tracing::debug!(?id, "Task completed");
self.tasks.remove(id);
}
}
}
}
impl<Data> Default for Executor<Data> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use crate::effect::FutureEffect;
use super::*;
#[test]
fn single_test() {
let (tx, rx) = flume::unbounded();
let mut ex = Executor::new();
let spawner = ex.spawner();
spawner.spawn(FutureEffect::new(rx.into_recv_async(), |data, val| {
*data = Some(val.unwrap());
}));
let mut data = None;
ex.tick(&mut data);
assert_eq!(data, None);
tx.send(5).unwrap();
ex.tick(&mut data);
assert_eq!(data, Some(5));
}
}