use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use crate::{
create_future_object, next_cosync_task_id, unlock_mutex, Cosync, CosyncInput, CosyncQueueHandle, CosyncTaskId,
};
#[derive(Debug)]
pub struct SerialCosync<T: ?Sized>(Cosync<T>);
impl<T: ?Sized + 'static> SerialCosync<T> {
pub fn new() -> Self {
Self(Cosync::new())
}
pub fn len(&self) -> usize {
self.is_running_any() as usize + unlock_mutex(&self.0.queue).incoming.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn is_running_any(&self) -> bool {
self.0.is_running_any()
}
pub fn current_task_id(&self) -> Option<CosyncTaskId> {
self.0.pool.iter().next().map(|v| v.1)
}
pub fn create_queue_handle(&self) -> CosyncQueueHandle<T> {
self.0.create_queue_handle()
}
pub fn unqueue_task(&mut self, task_id: CosyncTaskId) -> bool {
let incoming = &mut unlock_mutex(&self.0.queue).incoming;
let Some(index) = incoming.iter().position(|future_obj| future_obj.1 == task_id) else { return false };
incoming.remove(index);
true
}
pub fn stop_running_task(&mut self) {
self.0.pool.clear();
}
pub fn clear_queue(&mut self) {
unlock_mutex(&self.0.queue).incoming.clear();
}
pub fn clear(&mut self) {
self.0.clear();
}
pub fn queue<Task, Out>(&mut self, task: Task) -> CosyncTaskId
where
Task: FnOnce(CosyncInput<T>) -> Out + Send + 'static,
Out: Future<Output = ()> + Send,
{
let cosync_input = CosyncInput(self.0.create_queue_handle());
let id = next_cosync_task_id(&self.0.queue);
let mut lock = unlock_mutex(&self.0.queue);
lock.incoming.push_back(create_future_object(task, cosync_input, id));
id
}
pub fn run_blocking(&mut self, parameter: &mut T) {
super::run_blocking(self.0.data, parameter, |ctx| Self::poll_pool(&mut self.0, ctx));
}
pub fn run(&mut self, parameter: &mut T) {
super::run(self.0.data, parameter, |ctx| Self::poll_pool(&mut self.0, ctx))
}
#[deprecated(note = "use `run` instead", since = "0.3.0")]
#[doc(hidden)]
pub fn run_until_stall(&mut self, parameter: &mut T) {
self.run(parameter)
}
fn poll_pool(cosync: &mut Cosync<T>, cx: &mut Context<'_>) -> Poll<()> {
loop {
cosync.pool.increment_counter();
let pinned_pool = Pin::new(&mut cosync.pool);
let ret = pinned_pool.poll_next(cx);
match ret {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {
if let Some(task) = unlock_mutex(&cosync.queue).incoming.pop_front() {
cosync.pool.push(task);
} else {
return Poll::Ready(());
}
}
}
}
}
}
impl<T: ?Sized + 'static> Default for SerialCosync<T> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use crate::{sleep_ticks, yield_now};
use super::*;
#[test]
#[allow(clippy::needless_late_init)]
fn pool_remains_sequential() {
let mut value;
let mut executor: SerialCosync<i32> = SerialCosync::new();
executor.queue(move |mut input| async move {
*input.get() = 10;
sleep_ticks(100).await;
*input.get() = 20;
});
executor.queue(move |mut input| async move {
assert_eq!(*input.get(), 20);
});
value = 0;
executor.run(&mut value);
}
#[test]
fn cancelling_a_task() {
let mut cosync: SerialCosync<i32> = SerialCosync::new();
cosync.queue(|mut input| async move {
*input.get() += 1;
yield_now().await;
*input.get() += 1;
});
let mut value = 0;
cosync.run(&mut value);
assert_eq!(value, 1);
cosync.stop_running_task();
cosync.run(&mut value);
assert_eq!(value, 1);
assert!(cosync.is_empty());
let id = cosync.queue(|_| async {});
assert_eq!(cosync.len(), 1);
let success = cosync.unqueue_task(id);
assert!(success);
assert!(cosync.is_empty());
}
}