use std::panic;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::cancel::Cancel;
use crate::coroutine_impl::{
current_cancel_data, run_coroutine, Coroutine, CoroutineImpl, EventSource,
};
use crate::join::JoinHandle;
use crate::scoped::spawn_unsafe;
use crate::std::sync::Mutex;
use crate::std::sync::{AtomicOption, Blocker};
use crate::yield_now::yield_with;
use crate::std::queue::seg_queue::SegQueue as Queue;
/// This enumeration is the list of the possible reasons that `poll`
/// could not return Event when called.
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum PollError {
/// This cqueue currently has no event and timeout happens
/// so data may become available in future
Timeout,
/// This cqueue associated select coroutines are all finished
/// so there will never be any more event received on it unless
/// subscribe new select coroutines by using `add`
Finished,
}
/// This enumeration is the list of the possible reasons that an event
/// is generated
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
enum EventKind {
/// the select coroutine has successfully generated an event from top half
/// so we can continue it's bottom half after call the `poll`
Normal,
/// indicate a select coroutine is finished
Done,
}
/// The event that `poll` would return, events are generated when a select coroutine
/// has finished it's top half
#[derive(Debug)]
pub struct Event {
/// the token associated with the select coroutine
pub token: usize,
/// the select coroutine can use it to pass extra data with the caller
pub extra: usize,
/// id of the select coroutine, used internally to locate the JoinHandle
id: usize,
/// the event type
kind: EventKind,
// the async coroutine that work on an select
co: Option<CoroutineImpl>,
}
impl Event {
/// continue the select coroutine with it's bottom half
/// when `poll` got a Normal event, should always call it first
fn continue_bottom(&mut self) {
if let Some(co) = self.co.take() {
run_coroutine(co);
}
}
}
/// a handle type for the select coroutine
/// you can only use the `remove` method to manually delete the coroutine
pub struct Selector {
co: Coroutine,
}
impl Selector {
/// terminate the select coroutine
/// this would remove the selector from the associated cqueue
pub fn remove(self) {
unsafe { self.co.cancel() };
}
}
/// each select coroutine would use this struct to communicate with
/// the cqueue. the struct is created in `add` for each select coroutine
pub struct EventSender<'a> {
// index of the select coroutine
id: usize,
// associated token, passed from `add`
token: usize,
// the select coroutine can use it to pass extra data to the caller
extra: AtomicUsize,
// the mpsc event queue to collect the events
cqueue: &'a Cqueue,
}
unsafe impl<'a> Send for EventSender<'a> {}
impl<'a> EventSender<'a> {
/// get the token
pub fn get_token(&self) -> usize {
self.token
}
/// send out the event
pub fn send(&self, extra: usize) {
let cancel = current_cancel_data();
cancel.check_cancel();
self.extra.store(extra, Ordering::Relaxed);
yield_with(self);
}
}
impl<'a> EventSource for EventSender<'a> {
fn subscribe(&mut self, co: CoroutineImpl) {
self.cqueue.ev_queue.push(Event {
id: self.id,
token: self.token,
extra: self.extra.load(Ordering::Relaxed),
kind: EventKind::Normal,
co: Some(co),
});
if let Some(w) = self.cqueue.to_wake.take() {
w.unpark();
}
}
fn yield_back(&self, _cancel: &'static Cancel) {
// ignore the cancel to let the bottom half get processed
}
}
impl<'a> Drop for EventSender<'a> {
// when the select coroutine finished will trigger this drop
fn drop(&mut self) {
self.cqueue.ev_queue.push(Event {
id: self.id,
token: self.token,
extra: self.extra.load(Ordering::Relaxed),
kind: EventKind::Done,
co: None,
});
self.cqueue.cnt.fetch_sub(1, Ordering::Relaxed);
if let Some(w) = self.cqueue.to_wake.take() {
w.unpark();
}
}
}
/// cqueue interface for general select model
pub struct Cqueue {
// the mpsc queue that transfer event
ev_queue: Queue<Event>,
// thread/coroutine for wake up
to_wake: AtomicOption<Arc<Blocker>>,
// track how many coroutines left
cnt: AtomicUsize,
// store the select coroutine handles
selectors: Mutex<Vec<Option<JoinHandle<()>>>>,
// total created select coroutines
total: AtomicUsize,
// panic status
is_panicking: AtomicBool,
}
impl Cqueue {
/// register a select coroutine with the cqueue
/// should use `cqueue_add` and `cqueue_add_oneshot` macros to
/// create select coroutines correctly
fn add_impl<'a, F>(&self, token: usize, f: F) -> Selector
where
F: FnOnce(EventSender) + Send + 'a,
{
let sender = EventSender {
id: self.total.load(Ordering::Relaxed),
token,
extra: 0.into(),
cqueue: self,
};
let h = unsafe { spawn_unsafe(move || f(sender)) };
let co = h.coroutine().clone();
self.cnt.fetch_add(1, Ordering::Relaxed);
self.total.fetch_add(1, Ordering::Relaxed);
self.selectors.lock().unwrap().push(Some(h));
Selector { co }
}
/// register a select coroutine with the cqueue
/// should use `cqueue_add` and `cqueue_add_oneshot` macros to
/// create select coroutines correctly
pub fn add<'a, F>(&self, token: usize, f: F) -> Selector
where
F: FnOnce(EventSender) + Send + 'a,
{
self.add_impl(token, f)
}
// when the select coroutine is done, check the panic status
// if it's panicked, re throw the panic data
fn check_panic(&self, id: usize) {
if self.is_panicking.load(Ordering::Relaxed) {
return;
}
use generator::Error;
match self.selectors.lock().unwrap()[id]
.take()
.expect("join handler not set")
.join()
{
Ok(_) => {}
Err(panic) => {
if let Some(err) = panic.downcast_ref::<Error>() {
// ignore the cancel panic
if *err == Error::Cancel {
return;
}
}
self.is_panicking.store(true, Ordering::Relaxed);
panic::resume_unwind(panic);
}
}
}
/// poll an event that is ready to process
/// when the event is returned the bottom half is already run
/// the API is "completion" mode
/// if any panic in select coroutine detected during the poll
/// it will propagate the panic to the caller
pub fn poll(&self, timeout: Option<Duration>) -> Result<Event, PollError> {
macro_rules! run_ev {
($ev:ident) => {{
if $ev.kind == EventKind::Done {
self.check_panic($ev.id);
continue;
}
$ev.continue_bottom();
return Ok($ev);
}};
}
let deadline = timeout.map(|dur| Instant::now() + dur);
loop {
match self.ev_queue.pop() {
Some(mut ev) => run_ev!(ev),
None => {
if self.cnt.load(Ordering::Relaxed) == 0 {
return Err(PollError::Finished);
}
}
}
let cur = Blocker::current();
// register the waiter
self.to_wake.swap(cur.clone());
// re-check the queue
match self.ev_queue.pop() {
None => {
cur.park(timeout).ok();
}
Some(mut ev) => {
if let Some(w) = self.to_wake.take() {
w.unpark();
}
cur.park(timeout).ok();
run_ev!(ev);
}
}
// check the timeout
match deadline {
Some(d) if Instant::now() >= d => return Err(PollError::Timeout),
_ => {}
}
}
}
}
impl Drop for Cqueue {
// this would cancel all unfinished select coroutines
// and wait until all of them return back
fn drop(&mut self) {
// first cancel all the select coroutines if they are running
self.selectors
.lock()
.unwrap()
.iter()
.map(|j| j.as_ref())
.fold((), |_, join| match join {
Some(j) if !j.is_done() => unsafe { j.coroutine().cancel() },
_ => {}
});
// if self.is_panicking {
// return;
// }
// run the rest event
loop {
match self.poll(None) {
Ok(_) => {}
Err(_e @ PollError::Finished) => break,
_ => unreachable!("cqueue drop unreachable"),
}
}
// we are sure that all the coroutines are finished
}
}
/// Create a new `scope`, for select coroutines.
///
/// Scopes, in particular, support scoped select coroutine spawning.
///
pub fn scope<'a, F, R>(f: F) -> R
where
F: FnOnce(&Cqueue) -> R + 'a,
{
let cqueue = Cqueue {
ev_queue: Queue::new(),
to_wake: AtomicOption::none(),
cnt: AtomicUsize::new(0),
selectors: Mutex::new(Vec::new()),
total: AtomicUsize::new(0),
is_panicking: AtomicBool::new(false),
};
f(&cqueue)
}