#![doc = include_str!("../README.md")]
#![deny(rust_2018_idioms)]
#![deny(missing_docs)]
#![warn(clippy::dbg_macro)]
#![cfg_attr(not(test), warn(clippy::print_stdout))]
#![warn(clippy::missing_errors_doc)]
#![warn(clippy::missing_panics_doc)]
#![warn(clippy::todo)]
#![deny(rustdoc::broken_intra_doc_links)]
mod futures;
use crate::futures::{enter::enter, waker_ref, ArcWake, FuturesUnordered};
mod serial;
pub use serial::SerialCosync;
use std::{
collections::VecDeque,
fmt,
future::Future,
marker::PhantomData,
ops,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Weak,
},
task::{Context, Poll},
thread::{self, Thread},
};
#[derive(Debug)]
pub struct Cosync<T: ?Sized> {
pool: FuturesUnordered<FutureObject>,
queue: Arc<Mutex<IncomingQueue>>,
data: *mut Option<*mut T>,
}
impl<T: 'static + ?Sized> Cosync<T> {
pub fn new() -> Self {
Self {
pool: FuturesUnordered::new(),
queue: Arc::default(),
data: Box::into_raw(Box::new(None)),
}
}
pub fn len(&self) -> usize {
self.pool.len() + unlock_mutex(&self.queue).incoming.len()
}
pub fn is_empty(&self) -> bool {
!self.is_running_any() && unlock_mutex(&self.queue).incoming.is_empty()
}
pub fn is_running_any(&self) -> bool {
!self.pool.is_empty()
}
pub fn is_running(&self, task_id: CosyncTaskId) -> bool {
self.pool.iter().any(|v| v.1 == task_id)
}
pub fn create_queue_handle(&self) -> CosyncQueueHandle<T> {
CosyncQueueHandle {
heap_ptr: self.data,
queue: Arc::downgrade(&self.queue),
}
}
pub fn unqueue_task(&mut self, task_id: CosyncTaskId) -> bool {
let incoming = &mut unlock_mutex(&self.queue).incoming;
let Some(index) = incoming.iter().position(|future_obj| future_obj.1 == task_id) else { return false };
incoming.remove(index);
true
}
pub fn stop_running_task(&mut self, task_id: CosyncTaskId) -> bool {
for f in self.pool.iter_mut() {
if f.1 == task_id {
f.0 = Box::pin(async move {});
return true;
}
}
false
}
pub fn clear_running(&mut self) {
self.pool.clear();
}
pub fn clear_queue(&mut self) {
unlock_mutex(&self.queue).incoming.clear();
}
pub fn clear(&mut self) {
self.pool.clear();
unlock_mutex(&self.queue).incoming.clear();
}
pub fn queue<Task, Out>(&mut self, task: Task) -> CosyncTaskId
where
Task: FnOnce(CosyncInput<T>) -> Out + Send + 'static,
Out: Future<Output = ()> + Send,
{
let cosync_input = CosyncInput(self.create_queue_handle());
let id = next_cosync_task_id(&self.queue);
self.pool.push(create_future_object(task, cosync_input, id));
id
}
pub fn run_blocking(&mut self, parameter: &mut T) {
run_blocking(self.data, parameter, |ctx| self.poll_pool(ctx));
}
pub fn run(&mut self, parameter: &mut T) {
run(self.data, parameter, |ctx| self.poll_pool(ctx))
}
#[deprecated(note = "use `run` instead", since = "0.3.0")]
#[doc(hidden)]
pub fn run_until_stall(&mut self, parameter: &mut T) {
self.run(parameter)
}
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
{
let mut queue = unlock_mutex(&self.queue);
for task in queue.incoming.drain(..) {
self.pool.push(task);
}
}
self.pool.increment_counter();
loop {
let pinned_pool = Pin::new(&mut self.pool);
let output = pinned_pool.poll_next(cx);
let mut queue = unlock_mutex(&self.queue);
if queue.incoming.is_empty() {
break output;
}
for task in queue.incoming.drain(..) {
self.pool.push(task);
}
}
}
}
impl<T: ?Sized + 'static> Default for Cosync<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: ?Sized> Drop for Cosync<T> {
fn drop(&mut self) {
unsafe {
let _box = Box::from_raw(self.data);
}
}
}
unsafe impl<T: Send + ?Sized> Send for Cosync<T> {}
unsafe impl<T: Sync + ?Sized> Sync for Cosync<T> {}
impl<T> Unpin for Cosync<T> {}
#[derive(Debug)]
pub struct CosyncQueueHandle<T: ?Sized> {
heap_ptr: *mut Option<*mut T>,
queue: Weak<Mutex<IncomingQueue>>,
}
impl<T: 'static + ?Sized> CosyncQueueHandle<T> {
pub fn queue<Task, Out>(&self, task: Task) -> Option<CosyncTaskId>
where
Task: FnOnce(CosyncInput<T>) -> Out + Send + 'static,
Out: Future<Output = ()> + Send,
{
self.queue.upgrade().map(|queue| raw_queue(&queue, task, self.heap_ptr))
}
pub fn unqueue_task(&mut self, task_id: CosyncTaskId) -> Option<bool> {
self.queue.upgrade().map(|queue_handle| {
let incoming = &mut unlock_mutex(&queue_handle).incoming;
let Some(index) = incoming.iter().position(|future_obj| future_obj.1 == task_id) else { return false };
incoming.remove(index);
true
})
}
}
fn run_blocking<T: ?Sized + 'static>(
heap_ptr: *mut Option<*mut T>,
parameter: &mut T,
mut cback: impl FnMut(&mut Context<'_>) -> Poll<()>,
) {
unsafe {
*heap_ptr = Some(parameter as *mut _);
}
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref::waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = cback(&mut cx) {
return t;
}
let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
if !unparked {
thread::park();
thread_notify.unparked.store(false, Ordering::Release);
}
}
});
unsafe {
*heap_ptr = None;
}
}
fn run<T: ?Sized>(
heap_ptr: *mut Option<*mut T>,
parameter: &mut T,
mut cback: impl FnMut(&mut Context<'_>) -> Poll<()>,
) {
unsafe {
*heap_ptr = Some(parameter as *mut _);
}
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);
let _ = CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref::waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
cback(&mut cx)
});
unsafe {
*heap_ptr = None;
}
}
fn raw_queue<T, Task, Out>(queue: &Arc<Mutex<IncomingQueue>>, task: Task, heap_ptr: *mut Option<*mut T>) -> CosyncTaskId
where
T: ?Sized + 'static,
Task: FnOnce(CosyncInput<T>) -> Out + Send + 'static,
Out: Future<Output = ()> + Send,
{
let cosync_input = CosyncInput(CosyncQueueHandle {
heap_ptr,
queue: Arc::downgrade(queue),
});
let id = next_cosync_task_id(queue);
let future_object = create_future_object(task, cosync_input, id);
let mut mutex_lock = unlock_mutex(queue);
mutex_lock.incoming.push_back(future_object);
id
}
fn next_cosync_task_id(m: &Arc<Mutex<IncomingQueue>>) -> CosyncTaskId {
let mut lock = unlock_mutex(m);
let id = CosyncTaskId(lock.counter);
lock.counter = lock.counter.wrapping_add(1);
id
}
fn create_future_object<T, Task, Out>(task: Task, cosync_input: CosyncInput<T>, id: CosyncTaskId) -> FutureObject
where
T: ?Sized + 'static,
Task: FnOnce(CosyncInput<T>) -> Out + Send + 'static,
Out: Future<Output = ()> + Send,
{
let our_cb = Box::pin(async move {
task(cosync_input).await;
});
FutureObject(our_cb, id)
}
unsafe impl<T: ?Sized> Send for CosyncQueueHandle<T> {}
unsafe impl<T: ?Sized> Sync for CosyncQueueHandle<T> {}
impl<T: ?Sized> Clone for CosyncQueueHandle<T> {
fn clone(&self) -> Self {
Self {
heap_ptr: self.heap_ptr,
queue: self.queue.clone(),
}
}
}
#[derive(Debug)]
pub struct CosyncInput<T: ?Sized>(CosyncQueueHandle<T>);
impl<T: 'static + ?Sized> CosyncInput<T> {
pub fn get(&mut self) -> CosyncInputGuard<'_, T> {
assert!(Weak::strong_count(&self.0.queue) == 1, "cosync was dropped improperly");
let operation = unsafe {
let heap_ptr: *mut T = (*self.0.heap_ptr).expect("cosync was not initialized this run correctly");
&mut *heap_ptr
};
CosyncInputGuard(operation, PhantomData)
}
pub fn queue<Task, Out>(&self, task: Task) -> CosyncTaskId
where
Task: FnOnce(CosyncInput<T>) -> Out + Send + 'static,
Out: Future<Output = ()> + Send,
{
self.0.queue(task).expect("cosync was dropped improperly")
}
pub fn create_queue_handle(&self) -> CosyncQueueHandle<T> {
self.0.clone()
}
}
unsafe impl<T: ?Sized> Send for CosyncInput<T> {}
unsafe impl<T: ?Sized> Sync for CosyncInput<T> {}
pub struct CosyncInputGuard<'a, T: ?Sized>(&'a mut T, PhantomData<*const u8>);
impl<'a, T: ?Sized> ops::Deref for CosyncInputGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.0
}
}
impl<'a, T: ?Sized> ops::DerefMut for CosyncInputGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
}
}
struct FutureObject(Pin<Box<dyn Future<Output = ()> + 'static>>, CosyncTaskId);
impl Future for FutureObject {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}
impl fmt::Debug for FutureObject {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FutureObject").finish_non_exhaustive()
}
}
#[inline]
pub fn sleep_ticks(ticks: usize) -> SleepForTick {
SleepForTick::new(ticks)
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Clone, Copy, Debug)]
#[doc(hidden)] pub struct SleepForTick(pub usize);
impl SleepForTick {
pub fn new(ticks: usize) -> Self {
Self(ticks)
}
}
impl Future for SleepForTick {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.0 == 0 {
Poll::Ready(())
} else {
self.0 -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[inline]
pub fn yield_now() -> Yield {
Yield(false)
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Clone, Copy, Debug)]
#[doc(hidden)] pub struct Yield(bool);
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(transparent)]
pub struct CosyncTaskId(u64);
impl CosyncTaskId {
pub const DANGLING: CosyncTaskId = CosyncTaskId(u64::MAX);
}
#[derive(Debug, Default)]
struct IncomingQueue {
incoming: VecDeque<FutureObject>,
counter: u64,
}
struct ThreadNotify {
thread: Thread,
unparked: AtomicBool,
}
impl ArcWake for ThreadNotify {
fn wake_by_ref(this: &Arc<Self>) {
let unparked = this.unparked.swap(true, Ordering::Relaxed);
if !unparked {
this.thread.unpark();
}
}
}
thread_local! {
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
thread: thread::current(),
unparked: AtomicBool::new(false),
});
}
#[cfg(feature = "parking_lot")]
type Mutex<T> = parking_lot::Mutex<T>;
#[cfg(feature = "parking_lot")]
fn unlock_mutex<T>(mutex: &Mutex<T>) -> parking_lot::MutexGuard<'_, T> {
mutex.lock()
}
#[cfg(not(feature = "parking_lot"))]
type Mutex<T> = std::sync::Mutex<T>;
#[cfg(not(feature = "parking_lot"))]
fn unlock_mutex<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
mutex.lock().unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
static_assertions::assert_not_impl_all!(CosyncInputGuard<'_, i32>: Send, Sync);
#[test]
fn ordering() {
let mut cosync = Cosync::new();
let mut value = 0;
cosync.queue(|_i| async move {
println!("actual task body!");
});
cosync.run(&mut value);
}
#[test]
#[allow(clippy::needless_late_init)]
fn pool_is_sequential() {
let mut value;
let mut executor: Cosync<i32> = Cosync::new();
executor.queue(move |mut input| async move {
let mut input = input.get();
assert_eq!(*input, 10);
*input = 10;
});
executor.queue(move |mut input| async move {
assert_eq!(*input.get(), 10);
yield_now().await;
let input = &mut *input.get();
assert_eq!(*input, 30);
*input = 0;
});
value = 10;
executor.run(&mut value);
value = 30;
executor.run(&mut value);
assert_eq!(value, 0);
}
#[test]
#[allow(clippy::needless_late_init)]
fn multi_pool_is_parallel() {
let mut value;
let mut executor: Cosync<(i32, i32)> = Cosync::new();
executor.queue(move |mut input| async move {
let mut input = input.get();
assert_eq!((*input).0, 10);
(*input).0 = 30;
});
executor.queue(move |mut input| async move {
let mut input = input.get();
assert_eq!((*input).1, 20);
(*input).1 = 20;
});
value = (10, 20);
executor.run(&mut value);
assert_eq!(value, (30, 20));
}
#[test]
fn run_stalls() {
let mut cosync = Cosync::new();
cosync.queue(move |mut input| async move {
*input.get() = 10;
yield_now().await;
*input.get() = 20;
});
let mut value = 0;
cosync.run(&mut value);
assert_eq!(value, 10);
cosync.run(&mut value);
assert_eq!(value, 20);
}
#[test]
fn run_multiple() {
let mut value = (10, 20);
let mut executor: Cosync<(i32, i32)> = Cosync::new();
executor.queue(move |mut input| async move {
{
let mut input_guard = input.get();
assert_eq!((*input_guard).0, 10);
(*input_guard).0 = 30;
}
yield_now().await;
{
let mut input_guard = input.get();
(*input_guard).0 = 40;
}
});
executor.queue(move |mut input| async move {
{
let mut input = input.get();
assert_eq!((*input).1, 20);
(*input).1 = 20;
}
yield_now().await;
{
let mut input = input.get();
(*input).1 = 40;
}
});
executor.run(&mut value);
assert_eq!(value, (30, 20));
executor.run(&mut value);
assert_eq!(value, (40, 40));
}
#[test]
fn run_concurrent_weird() {
let mut value = (10, 20, 30);
let mut executor: Cosync<(i32, i32, i32)> = Cosync::new();
executor.queue(move |mut input| async move {
{
let mut input = input.get();
assert_eq!(input.2, 30);
input.2 = 20;
}
yield_now().await;
input.get().2 = 30;
yield_now().await;
input.get().2 = 40;
});
executor.queue(move |mut input| async move {
{
let mut input_guard = input.get();
assert_eq!((*input_guard).0, 10);
(*input_guard).0 = 30;
}
yield_now().await;
{
let mut input_guard = input.get();
(*input_guard).0 = 40;
}
});
executor.queue(move |mut input| async move {
{
let mut input = input.get();
assert_eq!((*input).1, 20);
(*input).1 = 20;
}
yield_now().await;
{
let mut input = input.get();
(*input).1 = 40;
}
});
executor.run(&mut value);
assert_eq!(value, (30, 20, 20));
executor.run(&mut value);
assert_eq!(value, (40, 40, 30));
executor.run(&mut value);
assert_eq!(value, (40, 40, 40));
}
#[test]
fn pool_remains_sequential_multi() {
let mut value = 0;
let mut executor: Cosync<i32> = Cosync::new();
executor.queue(move |mut input| async move {
*input.get() = 10;
input.queue(|mut input| async move {
println!("running this guy!!");
*input.get() = 20;
input.queue(|mut input| async move {
println!("running final guy!!");
*input.get() = 30;
});
});
});
executor.run(&mut value);
assert_eq!(value, 30);
}
#[test]
fn multi_sequential_on_spawn() {
struct TestMe {
one: i32,
two: i32,
}
let mut value = TestMe { one: 0, two: 0 };
let mut executor: Cosync<TestMe> = Cosync::new();
executor.queue(move |mut input| async move {
input.get().one = 10;
input.queue(|mut input| async move {
println!("running?/");
input.get().two = 20;
});
yield_now().await;
});
executor.run(&mut value);
assert_eq!(value.one, 10);
assert_eq!(value.two, 20);
}
#[test]
#[allow(clippy::needless_late_init)]
fn pool_is_still_sequential() {
let mut value;
let mut executor: Cosync<i32> = Cosync::new();
executor.queue(move |mut input| async move {
*input.get() = 10;
input.queue(move |mut input| async move {
assert_eq!(*input.get(), 20);
*input.get() = 30;
});
});
executor.queue(move |mut input| async move {
*input.get() = 20;
});
value = 0;
executor.run(&mut value);
assert_eq!(value, 30);
}
#[test]
#[allow(clippy::needless_late_init)]
fn cosync_can_be_moved() {
let mut value;
let mut executor: Cosync<i32> = Cosync::new();
executor.queue(move |mut input| async move {
*input.get() = 10;
yield_now().await;
*input.get() = 20;
});
value = 0;
executor.run(&mut value);
assert_eq!(value, 10);
let mut executor = Box::new(executor);
executor.run(&mut value);
assert_eq!(value, 20);
}
#[test]
#[should_panic(expected = "cosync was dropped improperly")]
fn ub_on_move_is_prevented() {
let (sndr, rx) = std::sync::mpsc::channel();
let mut executor: Cosync<i32> = Cosync::new();
executor.queue(move |input| async move {
let sndr: std::sync::mpsc::Sender<_> = sndr;
sndr.send(input).unwrap();
});
let mut value = 0;
executor.run_blocking(&mut value);
drop(executor);
let mut v = rx.recv().unwrap();
*v.get() = 20;
}
#[test]
#[should_panic(expected = "cosync was not initialized this run correctly")]
fn ub2_on_move_is_prevented() {
let (sndr, rx) = std::sync::mpsc::channel();
let mut executor: Cosync<i32> = Cosync::new();
executor.queue(move |input| async move {
let sndr: std::sync::mpsc::Sender<_> = sndr;
yield_now().await;
sndr.send(input).unwrap();
});
let mut value = 0;
executor.run_blocking(&mut value);
let mut v = rx.recv().unwrap();
*v.get() = 20;
}
#[test]
#[cfg_attr(miri, ignore = "miri cannot handle threading yet")]
fn threading() {
let mut cosync = Cosync::new();
let handler = cosync.create_queue_handle();
std::thread::spawn(move || {
handler.queue(|mut input| async move {
*input.get() = 20;
});
})
.join()
.unwrap();
let mut value = 1;
cosync.run_blocking(&mut value);
assert_eq!(value, 20);
}
#[test]
fn dynamic_dispatch() {
trait DynDispatch {
fn test(&self) -> i32;
}
impl DynDispatch for i32 {
fn test(&self) -> i32 {
*self
}
}
impl DynDispatch for &'static str {
fn test(&self) -> i32 {
self.parse().unwrap()
}
}
let mut cosync: Cosync<dyn DynDispatch> = Cosync::new();
cosync.queue(|mut input: CosyncInput<dyn DynDispatch>| async move {
{
let inner: &mut dyn DynDispatch = &mut *input.get();
assert_eq!(inner.test(), 3);
}
yield_now().await;
{
let inner: &mut dyn DynDispatch = &mut *input.get();
assert_eq!(inner.test(), 3);
}
});
cosync.run(&mut 3);
cosync.run(&mut "3");
}
#[test]
fn unsized_type() {
let mut cosync: Cosync<str> = Cosync::new();
cosync.queue(|mut input| async move {
let input_guard = input.get();
let inner_str: &str = &input_guard;
println!("inner str = {}", inner_str);
});
}
#[test]
fn can_move_non_copy() {
let mut cosync: Cosync<i32> = Cosync::new();
let my_vec = vec![10];
cosync.queue(|_input| async move {
let mut vec = my_vec;
vec.push(10);
assert_eq!(*vec, [10, 10]);
});
}
#[test]
fn task_id_numbers() {
let mut cosync: Cosync<i32> = Cosync::new();
let id = cosync.queue(|_input| async {});
assert_eq!(id, CosyncTaskId(0));
let second_task = cosync.queue(|_input| async {});
assert!(second_task > id);
}
#[test]
fn stop_a_running_task() {
let mut cosync: Cosync<i32> = Cosync::new();
let id = cosync.queue(|mut input| async move {
*input.get() += 1;
yield_now().await;
*input.get() += 1;
});
let mut value = 0;
cosync.run(&mut value);
assert_eq!(value, 1);
let success = cosync.stop_running_task(id);
assert!(success);
cosync.run(&mut value);
assert_eq!(value, 1);
}
#[test]
fn sleep_ticker() {
let mut cosync: Cosync<i32> = Cosync::new();
let mut value = 0;
cosync.queue(|mut input| async move {
sleep_ticks(5).await;
*input.get() = 10;
});
for _ in 0..5 {
cosync.run(&mut value);
assert_eq!(value, 0);
}
cosync.run(&mut value);
assert_eq!(value, 10);
}
}