1#![doc = include_str!("../README.md")]
2#![deny(rust_2018_idioms)]
3#![deny(missing_docs)]
4#![deny(rustdoc::all)]
5
6mod futures;
9use crate::futures::{enter::enter, waker_ref, ArcWake, FuturesUnordered};
10
11use std::{
12 collections::VecDeque,
13 fmt,
14 future::Future,
15 marker::PhantomData,
16 ops,
17 pin::Pin,
18 ptr::NonNull,
19 sync::{
20 atomic::{AtomicBool, Ordering},
21 Arc, Mutex, Weak,
22 },
23 task::{Context, Poll},
24 thread::{self, Thread},
25};
26
27#[derive(Debug)]
37pub struct Cosync<T: ?Sized> {
38 pool: FuturesUnordered<FutureObject>,
39 incoming: Arc<Mutex<VecDeque<FutureObject>>>,
40 data: Box<Option<NonNull<T>>>,
41 kill_box: Arc<()>,
42}
43
44impl<T: 'static + ?Sized> Cosync<T> {
45 pub fn new() -> Self {
47 Self {
48 pool: FuturesUnordered::new(),
49 incoming: Default::default(),
50 data: Box::new(None),
51 kill_box: Arc::new(()),
52 }
53 }
54
55 pub fn len(&self) -> usize {
61 let one = if self.is_executing() { 1 } else { 0 };
62
63 one + self.incoming.lock().unwrap().len()
64 }
65
66 pub fn is_empty(&self) -> bool {
68 !self.is_executing() && self.incoming.lock().unwrap().is_empty()
69 }
70
71 pub fn is_executing(&self) -> bool {
74 !self.pool.is_empty()
75 }
76
77 pub fn create_queue_handle(&self) -> CosyncQueueHandle<T> {
79 let heap_ptr = &*self.data as *const Option<_>;
80
81 CosyncQueueHandle {
82 heap_ptr,
83 incoming: self.incoming.clone(),
84 kill_box: Arc::downgrade(&self.kill_box),
85 }
86 }
87
88 pub fn queue<Task, Out>(&mut self, task: Task)
90 where
91 Task: FnOnce(CosyncInput<T>) -> Out + Send + 'static,
92 Out: Future<Output = ()> + Send,
93 {
94 let queue_handle = self.create_queue_handle();
95
96 queue_handle.queue(task)
97 }
98
99 pub fn run_blocking(&mut self, parameter: &mut T) {
118 unsafe {
120 *self.data = Some(NonNull::new_unchecked(parameter as *mut _));
121 }
122
123 run_executor(|cx| self.poll_pool(cx));
124
125 *self.data = None;
127 }
128
129 pub fn run_until_stall(&mut self, parameter: &mut T) {
159 unsafe {
161 *self.data = Some(NonNull::new_unchecked(parameter as *mut _));
162 }
163
164 poll_executor(|ctx| {
165 let _output = self.poll_pool(ctx);
166 });
167
168 *self.data = None;
170 }
171
172 fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
175 loop {
177 let ret = self.poll_pool_once(cx);
178
179 match ret {
181 Poll::Pending => return Poll::Pending,
182 Poll::Ready(None) => return Poll::Ready(()),
183 _ => {}
184 }
185 }
186 }
187
188 fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
190 if self.pool.is_empty() {
192 if let Some(task) = self.incoming.lock().unwrap().pop_front() {
193 self.pool.push(task)
194 }
195 }
196
197 Pin::new(&mut self.pool).poll_next(cx)
199 }
200}
201
202#[derive(Debug)]
224pub struct CosyncQueueHandle<T: ?Sized> {
225 heap_ptr: *const Option<NonNull<T>>,
226 incoming: Arc<Mutex<VecDeque<FutureObject>>>,
227 kill_box: Weak<()>,
228}
229
230impl<T: 'static + ?Sized> CosyncQueueHandle<T> {
231 pub fn queue<Task, Out>(&self, task: Task)
233 where
234 Task: FnOnce(CosyncInput<T>) -> Out + Send + 'static,
235 Out: Future<Output = ()> + Send,
236 {
237 queue_task(task, self.kill_box.clone(), self.heap_ptr, &self.incoming);
238 }
239}
240
241#[allow(clippy::non_send_fields_in_send_ty)]
247unsafe impl<T: ?Sized> Send for CosyncQueueHandle<T> {}
248unsafe impl<T: ?Sized> Sync for CosyncQueueHandle<T> {}
249
250impl<T: ?Sized> Clone for CosyncQueueHandle<T> {
251 fn clone(&self) -> Self {
252 Self {
253 heap_ptr: self.heap_ptr,
254 incoming: self.incoming.clone(),
255 kill_box: self.kill_box.clone(),
256 }
257 }
258}
259
260#[derive(Debug)]
265pub struct CosyncInput<T: ?Sized>(CosyncQueueHandle<T>);
266
267impl<T: 'static + ?Sized> CosyncInput<T> {
268 pub fn get(&mut self) -> CosyncInputGuard<'_, T> {
270 assert!(
273 Weak::strong_count(&self.0.kill_box) == 1,
274 "cosync was dropped improperly"
275 );
276
277 let o = unsafe {
280 (&*self.0.heap_ptr)
281 .expect("cosync was not initialized this run correctly")
282 .as_mut()
283 };
284
285 CosyncInputGuard(o, PhantomData)
286 }
287
288 pub fn queue<Task, Out>(&self, task: Task)
290 where
291 Task: Fn(CosyncInput<T>) -> Out + Send + 'static,
292 Out: Future<Output = ()> + Send,
293 {
294 self.0.queue(task)
295 }
296
297 pub fn create_queue_handle(&self) -> CosyncQueueHandle<T> {
299 self.0.clone()
300 }
301}
302
303#[allow(clippy::non_send_fields_in_send_ty)]
308unsafe impl<T: ?Sized> Send for CosyncInput<T> {}
309unsafe impl<T: ?Sized> Sync for CosyncInput<T> {}
310
311pub struct CosyncInputGuard<'a, T: ?Sized>(&'a mut T, PhantomData<*const u8>);
316
317impl<'a, T: ?Sized> ops::Deref for CosyncInputGuard<'a, T> {
318 type Target = T;
319
320 fn deref(&self) -> &Self::Target {
321 self.0
322 }
323}
324
325impl<'a, T: ?Sized> ops::DerefMut for CosyncInputGuard<'a, T> {
326 fn deref_mut(&mut self) -> &mut Self::Target {
327 self.0
328 }
329}
330
331impl<T: 'static> Default for Cosync<T> {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337struct FutureObject(Pin<Box<dyn Future<Output = ()> + 'static>>);
338impl Future for FutureObject {
339 type Output = ();
340
341 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
342 Pin::new(&mut self.0).poll(cx)
343 }
344}
345
346impl fmt::Debug for FutureObject {
347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
348 f.debug_struct("FutureObject").finish()
349 }
350}
351
352pub(crate) struct ThreadNotify {
353 pub thread: Thread,
355 pub unparked: AtomicBool,
361}
362
363impl ArcWake for ThreadNotify {
364 fn wake_by_ref(this: &Arc<Self>) {
365 let unparked = this.unparked.swap(true, Ordering::Relaxed);
367 if !unparked {
368 this.thread.unpark();
374 }
375 }
376}
377
378thread_local! {
379 static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
380 thread: thread::current(),
381 unparked: AtomicBool::new(false),
382 });
383}
384
385fn run_executor<T, F>(mut work_on_future: F) -> T
388where
389 F: FnMut(&mut Context<'_>) -> Poll<T>,
390{
391 let _enter = enter().expect(
392 "cannot execute `LocalPool` executor from within \
393 another executor",
394 );
395
396 CURRENT_THREAD_NOTIFY.with(|thread_notify| {
397 let waker = waker_ref::waker_ref(thread_notify);
398 let mut cx = Context::from_waker(&waker);
399 loop {
400 if let Poll::Ready(t) = work_on_future(&mut cx) {
401 return t;
402 }
403 let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
405 if !unparked {
406 thread::park();
410 thread_notify.unparked.store(false, Ordering::Release);
414 }
415 }
416 })
417}
418
419fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
420 let _enter = enter().expect(
421 "cannot execute `LocalPool` executor from within \
422 another executor",
423 );
424
425 CURRENT_THREAD_NOTIFY.with(|thread_notify| {
426 let waker = waker_ref::waker_ref(thread_notify);
427 let mut cx = Context::from_waker(&waker);
428 f(&mut cx)
429 })
430}
431
432fn queue_task<T: 'static + ?Sized, Task, Out>(
434 task: Task,
435 kill_box: Weak<()>,
436 heap_ptr: *const Option<NonNull<T>>,
437 incoming: &Arc<Mutex<VecDeque<FutureObject>>>,
438) where
439 Task: FnOnce(CosyncInput<T>) -> Out + Send + 'static,
440 Out: Future<Output = ()> + Send,
441{
442 let task = task;
444 let sec = CosyncInput(CosyncQueueHandle {
445 heap_ptr,
446 incoming: incoming.clone(),
447 kill_box,
448 });
449
450 let our_cb = Box::pin(async move {
451 task(sec).await;
452 });
453
454 incoming.lock().unwrap().push_back(FutureObject(our_cb));
455}
456
457pub fn sleep_ticks(ticks: usize) -> SleepForTick {
463 SleepForTick::new(ticks)
464}
465
466#[derive(Clone, Copy, Debug)]
468#[doc(hidden)] pub struct SleepForTick(pub usize);
470
471impl SleepForTick {
472 pub fn new(ticks: usize) -> Self {
474 Self(ticks)
475 }
476}
477
478impl Future for SleepForTick {
479 type Output = ();
480
481 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
482 if self.0 == 0 {
483 Poll::Ready(())
484 } else {
485 self.0 -= 1;
486
487 cx.waker().wake_by_ref();
490
491 Poll::Pending
492 }
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499
500 static_assertions::assert_not_impl_all!(CosyncInputGuard<'_, i32>: Send);
501
502 #[test]
503 fn ordering() {
504 let mut cosync = Cosync::new();
505
506 let mut value = 0;
507 cosync.queue(|_i| async move {
508 println!("actual task body!");
509 });
510 cosync.run_until_stall(&mut value);
511 }
512
513 #[test]
514 #[allow(clippy::needless_late_init)]
515 fn pool_is_sequential() {
516 let mut value;
518
519 let mut executor: Cosync<i32> = Cosync::new();
520 executor.queue(move |mut input| async move {
521 let mut input = input.get();
522
523 assert_eq!(*input, 10);
524 *input = 10;
525 });
526
527 executor.queue(move |mut input| async move {
528 assert_eq!(*input.get(), 10);
529
530 let sleep = SleepForTick(1);
535 sleep.await;
536
537 let input = &mut *input.get();
538 assert_eq!(*input, 30);
539 *input = 0;
540 });
541
542 value = 10;
545 executor.run_until_stall(&mut value);
546 value = 30;
547 executor.run_until_stall(&mut value);
548 assert_eq!(value, 0);
549 }
550
551 #[test]
552 fn run_until_stalled_stalls() {
553 let mut cosync = Cosync::new();
554
555 cosync.queue(move |mut input| async move {
556 *input.get() = 10;
557 sleep_ticks(1).await;
561
562 *input.get() = 20;
563 });
564
565 let mut value = 0;
566 cosync.run_until_stall(&mut value);
567 assert_eq!(value, 10);
568 cosync.run_until_stall(&mut value);
569 assert_eq!(value, 20);
570 }
571
572 #[test]
573 #[allow(clippy::needless_late_init)]
574 fn pool_remains_sequential() {
575 let mut value;
577
578 let mut executor: Cosync<i32> = Cosync::new();
579 executor.queue(move |mut input| async move {
580 println!("starting task 1");
581 *input.get() = 10;
582
583 sleep_ticks(100).await;
584
585 *input.get() = 20;
586 });
587
588 executor.queue(move |mut input| async move {
589 assert_eq!(*input.get(), 20);
590 });
591
592 value = 0;
593 executor.run_until_stall(&mut value);
594 }
595
596 #[test]
597 #[allow(clippy::needless_late_init)]
598 fn pool_is_still_sequential() {
599 let mut value;
601
602 let mut executor: Cosync<i32> = Cosync::new();
603 executor.queue(move |mut input| async move {
604 println!("starting task 1");
605 *input.get() = 10;
606
607 input.queue(move |mut input| async move {
608 println!("starting task 3");
609 assert_eq!(*input.get(), 20);
610
611 *input.get() = 30;
612 });
613 });
614
615 executor.queue(move |mut input| async move {
616 println!("starting task 2");
617 *input.get() = 20;
618 });
619
620 value = 0;
623 executor.run_until_stall(&mut value);
624 assert_eq!(value, 30);
625 }
626
627 #[test]
628 #[allow(clippy::needless_late_init)]
629 fn cosync_can_be_moved() {
630 let mut value;
632
633 let mut executor: Cosync<i32> = Cosync::new();
634 executor.queue(move |mut input| async move {
635 println!("starting task 1");
636 *input.get() = 10;
637
638 sleep_ticks(1).await;
639
640 *input.get() = 20;
641 });
642
643 value = 0;
646 executor.run_until_stall(&mut value);
647 assert_eq!(value, 10);
648
649 let mut executor = Box::new(executor);
651 executor.run_until_stall(&mut value);
652
653 assert_eq!(value, 20);
654 }
655
656 #[test]
657 #[should_panic(expected = "cosync was dropped improperly")]
658 fn ub_on_move_is_prevented() {
659 let (sndr, rx) = std::sync::mpsc::channel();
660 let mut executor: Cosync<i32> = Cosync::new();
661
662 executor.queue(move |input| async move {
663 let sndr: std::sync::mpsc::Sender<_> = sndr;
664 sndr.send(input).unwrap();
665 });
666
667 let mut value = 0;
668 executor.run_blocking(&mut value);
669 drop(executor);
670
671 let mut v = rx.recv().unwrap();
673 *v.get() = 20;
674 }
675
676 #[test]
677 fn threading() {
678 let mut cosync = Cosync::new();
679 let handler = cosync.create_queue_handle();
680
681 std::thread::spawn(move || {
683 handler.queue(|mut input| async move {
684 *input.get() = 20;
685 });
686 })
687 .join()
688 .unwrap();
689
690 let mut value = 1;
691 cosync.run_blocking(&mut value);
692 assert_eq!(value, 20);
693 }
694
695 #[test]
696 fn trybuild() {
697 let t = trybuild::TestCases::new();
698 t.compile_fail("tests/try_build/*.rs");
699 }
700
701 #[test]
702 fn dynamic_dispatch() {
703 trait DynDispatch {
704 fn test(&self) -> i32;
705 }
706
707 impl DynDispatch for i32 {
708 fn test(&self) -> i32 {
709 *self
710 }
711 }
712
713 impl DynDispatch for &'static str {
714 fn test(&self) -> i32 {
715 self.parse().unwrap()
716 }
717 }
718
719 let mut cosync: Cosync<dyn DynDispatch> = Cosync::new();
720 cosync.queue(|mut input: CosyncInput<dyn DynDispatch>| async move {
721 {
722 let inner: &mut dyn DynDispatch = &mut *input.get();
723 assert_eq!(inner.test(), 3);
724 }
725
726 sleep_ticks(1).await;
727
728 {
729 let inner: &mut dyn DynDispatch = &mut *input.get();
730 assert_eq!(inner.test(), 3);
731 }
732 });
733
734 cosync.run_until_stall(&mut 3);
735 cosync.run_until_stall(&mut "3");
736 }
737
738 #[test]
739 fn unsized_type() {
740 let mut cosync: Cosync<str> = Cosync::new();
741
742 cosync.queue(|mut input| async move {
743 let input_guard = input.get();
744 let inner_str: &str = &input_guard;
745 println!("inner str = {}", inner_str);
746 });
747 }
748
749 #[test]
750 fn can_move_non_copy() {
751 let mut cosync: Cosync<i32> = Cosync::new();
752
753 let my_vec = vec![10];
754
755 cosync.queue(|_input| async move {
756 let mut vec = my_vec;
757 vec.push(10);
758
759 assert_eq!(*vec, [10, 10]);
760 });
761 }
762}