use super::super::Command;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll, Wake, Waker};
use crossbeam_channel::{Receiver, Sender};
use futures::future::BoxFuture;
use std::sync::atomic::AtomicBool;
use futures::task::AtomicWaker;
use std::sync::Arc;
#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) struct TaskId(pub(crate) usize);
pub(crate) struct Task {
pub(crate) join_handle_wakers: Receiver<Waker>,
pub(crate) finished: Arc<AtomicBool>,
pub(crate) aborted: Arc<AtomicBool>,
pub(crate) future: BoxFuture<'static, ()>,
}
impl Task {
pub(crate) fn is_aborted(&self) -> bool {
self.aborted.load(Ordering::Acquire)
}
fn wake_join_handles(&self) {
for waker in self.join_handle_wakers.try_iter() {
waker.wake();
}
}
}
pub(crate) struct CommandWaker {
pub(crate) task_id: TaskId,
pub(crate) ready_queue: Sender<TaskId>,
pub(crate) parent_waker: Arc<AtomicWaker>,
woken: AtomicBool,
}
impl Wake for CommandWaker {
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}
fn wake_by_ref(self: &Arc<Self>) {
let _ = self.ready_queue.send(self.task_id);
self.woken.store(true, Ordering::Release);
self.parent_waker.wake();
}
}
#[derive(Clone)]
pub struct AbortHandle {
pub(crate) aborted: Arc<AtomicBool>,
}
impl AbortHandle {
pub fn abort(&self) {
self.aborted.store(true, Ordering::Release);
}
}
#[derive(Clone)]
pub struct JoinHandle {
pub(crate) register_waker: Sender<Waker>,
pub(crate) finished: Arc<AtomicBool>,
pub(crate) aborted: Arc<AtomicBool>,
}
impl JoinHandle {
pub fn abort(&self) {
self.aborted.store(true, Ordering::Release);
}
pub(crate) fn is_finished(&self) -> bool {
self.finished.load(Ordering::Acquire)
}
}
impl Future for JoinHandle {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.is_finished() {
Poll::Ready(())
} else {
match self.register_waker.send(cx.waker().clone()) {
Ok(()) => Poll::Pending,
Err(_) => Poll::Ready(()),
}
}
}
}
#[derive(Debug, PartialEq)]
pub(crate) enum TaskState {
Missing,
Suspended,
Completed,
Cancelled,
}
impl<Effect, Event> Command<Effect, Event> {
pub(crate) fn run_until_settled(&mut self) {
if self.was_aborted() {
self.spawn_new_tasks();
self.tasks.clear();
return;
}
loop {
self.spawn_new_tasks();
if self.ready_queue.is_empty() {
break;
}
while let Ok(task_id) = self.ready_queue.try_recv() {
match self.run_task(task_id) {
TaskState::Missing | TaskState::Suspended => {
}
TaskState::Completed | TaskState::Cancelled => {
let task = self.tasks.remove(task_id.0);
task.finished.store(true, Ordering::Release);
task.wake_join_handles();
drop(task);
}
}
}
}
}
pub(crate) fn run_task(&mut self, task_id: TaskId) -> TaskState {
let Some(task) = self.tasks.get_mut(task_id.0) else {
return TaskState::Missing;
};
if task.is_aborted() {
return TaskState::Completed;
}
let ready_queue = self.ready_sender.clone();
let parent_waker = self.waker.clone();
let arc_waker = Arc::new(CommandWaker {
task_id,
ready_queue,
parent_waker,
woken: AtomicBool::new(false),
});
let waker = arc_waker.clone().into();
let context = &mut Context::from_waker(&waker);
let result = match task.future.as_mut().poll(context) {
Poll::Pending => TaskState::Suspended,
Poll::Ready(()) => TaskState::Completed,
};
drop(waker);
let task_is_ready = arc_waker.woken.load(Ordering::Acquire);
if result == TaskState::Suspended && !task_is_ready && Arc::strong_count(&arc_waker) < 2 {
return TaskState::Cancelled;
}
result
}
pub(crate) fn spawn_new_tasks(&mut self) {
while let Ok(task) = self.spawn_queue.try_recv() {
let task_id = self.tasks.insert(task);
self.ready_sender
.send(TaskId(task_id))
.expect("Command can't spawn a task, ready_queue has disconnected");
}
}
#[must_use]
pub fn was_aborted(&self) -> bool {
self.aborted.load(Ordering::Acquire)
}
}