use std::panic;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::cancel::Cancel;
use crate::coroutine_impl::{
current_cancel_data, run_coroutine, Coroutine, CoroutineImpl, EventSource,
};
use crate::join::JoinHandle;
use crate::scoped::spawn_unsafe;
use crate::std::sync::Mutex;
use crate::std::sync::{AtomicOption, Blocker};
use crate::yield_now::yield_with;
use crate::std::queue::seg_queue::SegQueue as Queue;
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum PollError {
Timeout,
Finished,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
enum EventKind {
Normal,
Done,
}
#[derive(Debug)]
pub struct Event {
pub token: usize,
pub extra: usize,
id: usize,
kind: EventKind,
co: Option<CoroutineImpl>,
}
impl Event {
fn continue_bottom(&mut self) {
if let Some(co) = self.co.take() {
run_coroutine(co);
}
}
}
pub struct Selector {
co: Coroutine,
}
impl Selector {
pub fn remove(self) {
unsafe { self.co.cancel() };
}
}
pub struct EventSender<'a> {
id: usize,
token: usize,
extra: AtomicUsize,
cqueue: &'a Cqueue,
}
unsafe impl<'a> Send for EventSender<'a> {}
impl<'a> EventSender<'a> {
pub fn get_token(&self) -> usize {
self.token
}
pub fn send(&self, extra: usize) {
let cancel = current_cancel_data();
cancel.check_cancel();
self.extra.store(extra, Ordering::Relaxed);
yield_with(self);
}
}
impl<'a> EventSource for EventSender<'a> {
fn subscribe(&mut self, co: CoroutineImpl) {
self.cqueue.ev_queue.push(Event {
id: self.id,
token: self.token,
extra: self.extra.load(Ordering::Relaxed),
kind: EventKind::Normal,
co: Some(co),
});
if let Some(w) = self.cqueue.to_wake.take() {
w.unpark();
}
}
fn yield_back(&self, _cancel: &'static Cancel) {
}
}
impl<'a> Drop for EventSender<'a> {
fn drop(&mut self) {
self.cqueue.ev_queue.push(Event {
id: self.id,
token: self.token,
extra: self.extra.load(Ordering::Relaxed),
kind: EventKind::Done,
co: None,
});
self.cqueue.cnt.fetch_sub(1, Ordering::Relaxed);
if let Some(w) = self.cqueue.to_wake.take() {
w.unpark();
}
}
}
pub struct Cqueue {
ev_queue: Queue<Event>,
to_wake: AtomicOption<Arc<Blocker>>,
cnt: AtomicUsize,
selectors: Mutex<Vec<Option<JoinHandle<()>>>>,
total: AtomicUsize,
is_panicking: AtomicBool,
}
impl Cqueue {
fn add_impl<'a, F>(&self, token: usize, f: F) -> Selector
where
F: FnOnce(EventSender) + Send + 'a,
{
let sender = EventSender {
id: self.total.load(Ordering::Relaxed),
token,
extra: 0.into(),
cqueue: self,
};
let h = unsafe { spawn_unsafe(move || f(sender)) };
let co = h.coroutine().clone();
self.cnt.fetch_add(1, Ordering::Relaxed);
self.total.fetch_add(1, Ordering::Relaxed);
self.selectors.lock().unwrap().push(Some(h));
Selector { co }
}
pub fn add<'a, F>(&self, token: usize, f: F) -> Selector
where
F: FnOnce(EventSender) + Send + 'a,
{
self.add_impl(token, f)
}
fn check_panic(&self, id: usize) {
if self.is_panicking.load(Ordering::Relaxed) {
return;
}
use generator::Error;
match self.selectors.lock().unwrap()[id]
.take()
.expect("join handler not set")
.join()
{
Ok(_) => {}
Err(panic) => {
if let Some(err) = panic.downcast_ref::<Error>() {
if *err == Error::Cancel {
return;
}
}
self.is_panicking.store(true, Ordering::Relaxed);
panic::resume_unwind(panic);
}
}
}
pub fn poll(&self, timeout: Option<Duration>) -> Result<Event, PollError> {
macro_rules! run_ev {
($ev:ident) => {{
if $ev.kind == EventKind::Done {
self.check_panic($ev.id);
continue;
}
$ev.continue_bottom();
return Ok($ev);
}};
}
let deadline = timeout.map(|dur| Instant::now() + dur);
loop {
match self.ev_queue.pop() {
Some(mut ev) => run_ev!(ev),
None => {
if self.cnt.load(Ordering::Relaxed) == 0 {
return Err(PollError::Finished);
}
}
}
let cur = Blocker::current();
self.to_wake.swap(cur.clone());
match self.ev_queue.pop() {
None => {
cur.park(timeout).ok();
}
Some(mut ev) => {
if let Some(w) = self.to_wake.take() {
w.unpark();
}
cur.park(timeout).ok();
run_ev!(ev);
}
}
match deadline {
Some(d) if Instant::now() >= d => return Err(PollError::Timeout),
_ => {}
}
}
}
}
impl Drop for Cqueue {
fn drop(&mut self) {
self.selectors
.lock()
.unwrap()
.iter()
.map(|j| j.as_ref())
.fold((), |_, join| match join {
Some(j) if !j.is_done() => unsafe { j.coroutine().cancel() },
_ => {}
});
loop {
match self.poll(None) {
Ok(_) => {}
Err(_e @ PollError::Finished) => break,
_ => unreachable!("cqueue drop unreachable"),
}
}
}
}
pub fn scope<'a, F, R>(f: F) -> R
where
F: FnOnce(&Cqueue) -> R + 'a,
{
let cqueue = Cqueue {
ev_queue: Queue::new(),
to_wake: AtomicOption::none(),
cnt: AtomicUsize::new(0),
selectors: Mutex::new(Vec::new()),
total: AtomicUsize::new(0),
is_panicking: AtomicBool::new(false),
};
f(&cqueue)
}