#[cfg(feature = "alloc")]
use crate::{Box, TaskWaker, VecDeque};
use crate::{Debug, Future, OptRes, Pin, TaskContext, TaskPoll, serr, sok};
#[doc = crate::_tags!(concurrency runtime)]
#[doc = crate::_doc_location!("work/future")]
#[derive(Clone, Copy, Debug)]
pub struct CoroWorker<T, E> {
status: CoroWorkerStatus,
result: OptRes<T, E>,
}
#[derive(Clone, Copy, Debug)]
enum CoroWorkerStatus {
Halted,
Running,
}
impl<T, E> CoroWorker<T, E> {
#[allow(unused)]
const fn new() -> Self {
CoroWorker { status: CoroWorkerStatus::Running, result: None }
}
pub fn yield_ok(&mut self, value: T) -> CoroWork<'_, T, E> {
self.result = sok(value);
CoroWork { cor: self }
}
pub fn yield_err(&mut self, error: E) -> CoroWork<'_, T, E> {
self.result = serr(error);
CoroWork { cor: self }
}
}
#[doc = crate::_tags!(concurrency runtime)]
#[doc = crate::_doc_location!("work/future")]
#[derive(Debug)]
pub struct CoroWork<'a, T, E> {
cor: &'a mut CoroWorker<T, E>,
}
impl<T, E> Future for CoroWork<'_, T, E> {
type Output = OptRes<T, E>;
fn poll(mut self: Pin<&mut Self>, _cx: &mut TaskContext) -> TaskPoll<OptRes<T, E>> {
match self.cor.status {
CoroWorkerStatus::Halted => {
self.cor.status = CoroWorkerStatus::Running;
if let Some(result) = self.cor.result.take() {
match result {
Err(error) => TaskPoll::Ready(serr(error)),
Ok(value) => TaskPoll::Ready(sok(value)),
}
} else {
unreachable!();
}
}
CoroWorkerStatus::Running => {
self.cor.status = CoroWorkerStatus::Halted;
TaskPoll::Pending
}
}
}
}
#[doc = crate::_tags!(concurrency runtime)]
#[doc = crate::_doc_location!("work/future")]
#[doc = include_str!("../../../../examples/work/coro_manager.rs")]
#[derive(Default)]
#[cfg(feature = "alloc")]
#[cfg_attr(nightly_doc, doc(cfg(feature = "alloc")))]
#[allow(missing_debug_implementations, reason = "unsatisified trait bounds")]
pub struct CoroManager<T, E> {
#[allow(clippy::type_complexity)]
coros: VecDeque<Pin<Box<dyn Future<Output = OptRes<T, E>>>>>,
}
#[cfg(feature = "alloc")]
impl<T, E: 'static + Debug> CoroManager<T, E> {
pub fn new() -> Self {
CoroManager { coros: VecDeque::new() }
}
pub fn push<C, F>(&mut self, closure: C)
where
C: FnOnce(CoroWorker<T, E>) -> F,
F: Future<Output = OptRes<T, E>> + 'static,
{
self.coros.push_back(Box::pin(closure(CoroWorker::new())));
}
pub fn run(&mut self) {
let waker = TaskWaker::noop();
let mut context = TaskContext::from_waker(waker);
while let Some(mut cor) = self.coros.pop_front() {
let polled = cor.as_mut().poll(&mut context);
match polled {
TaskPoll::Pending => {
self.coros.push_back(cor);
}
TaskPoll::Ready(_result) => {
}
}
}
}
}