use std::fmt;
use std::io;
use std::sync::Arc;
use std::time::Duration;
use crate::cancel::Cancel;
use crate::config::config;
use crate::err;
use crate::join::{make_join_handle, Join, JoinHandle};
use crate::local::get_co_local_data;
use crate::local::CoroutineLocal;
use crate::park::Park;
use crate::scheduler::get_scheduler;
use crossbeam::atomic::AtomicCell;
use mco_gen::{Generator, Gn};
pub type EventResult = io::Error;
pub struct EventSubscriber {
resource: *mut dyn EventSource,
}
unsafe impl Send for EventSubscriber {}
impl EventSubscriber {
pub fn new(r: *mut dyn EventSource) -> Self {
EventSubscriber { resource: r }
}
pub fn subscribe(self, c: CoroutineImpl) {
let resource = unsafe { &mut *self.resource };
resource.subscribe(c);
}
}
pub trait EventSource {
fn subscribe(&mut self, _c: CoroutineImpl);
fn yield_back(&self, cancel: &'static Cancel) {
cancel.check_cancel();
}
}
pub struct Done;
impl Done {
fn drop_coroutine(co: CoroutineImpl) {
let local = unsafe { Box::from_raw(get_co_local(&co)) };
let name = local.get_co().name();
let (size, used) = co.stack_usage();
if used == size {
eprintln!("stack overflow detected, size={}", size);
::std::process::exit(1);
}
if local.get_co().stack_size() & 1 == 1 {
println!(
"coroutine name = {:?}, stack size = {}, used size = {}",
name, size, used
);
}
if size == config().get_stack_size() {
get_scheduler().pool.put(co);
}
}
}
impl EventSource for Done {
fn subscribe(&mut self, co: CoroutineImpl) {
Self::drop_coroutine(co);
}
}
pub type CoroutineImpl = Generator<'static, EventResult, EventSubscriber>;
#[inline]
#[allow(clippy::cast_ptr_alignment)]
fn get_co_local(co: &CoroutineImpl) -> *mut CoroutineLocal {
co.get_local_data() as *mut CoroutineLocal
}
struct Inner {
name: Option<String>,
stack_size: usize,
park: Park,
cancel: Cancel,
}
#[derive(Clone)]
pub struct Coroutine {
inner: Arc<Inner>,
}
unsafe impl Send for Coroutine {}
impl Coroutine {
fn new(name: Option<String>, stack_size: usize) -> Coroutine {
Coroutine {
inner: Arc::new(Inner {
name,
stack_size,
park: Park::new(),
cancel: Cancel::new(),
}),
}
}
pub fn stack_size(&self) -> usize {
self.inner.stack_size
}
pub fn unpark(&self) {
self.inner.park.unpark();
}
pub fn cancel(&self) {
let _ = self.inner.cancel.cancel();
}
pub fn name(&self) -> Option<&str> {
self.inner.name.as_deref()
}
#[cfg(unix)]
pub(crate) fn get_cancel(&self) -> &Cancel {
&self.inner.cancel
}
}
impl fmt::Debug for Coroutine {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.name(), f)
}
}
#[derive(Default)]
pub struct Builder {
name: Option<String>,
stack_size: Option<usize>,
}
impl Builder {
pub fn new() -> Builder {
Builder {
name: None,
stack_size: None,
}
}
pub fn name(mut self, name: String) -> Builder {
self.name = Some(name);
self
}
pub fn stack_size(mut self, size: usize) -> Builder {
self.stack_size = Some(size);
self
}
fn spawn_impl<F, T>(self, f: F) -> (CoroutineImpl, JoinHandle<T>)
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
static DONE: Done = Done {};
let sched = get_scheduler();
let Builder { name, stack_size } = self;
let stack_size = stack_size.unwrap_or_else(|| config().get_stack_size());
let _co = if stack_size == config().get_stack_size() {
let co = sched.pool.get();
co.prefetch();
Some(co)
} else {
None
};
let panic = Arc::new(AtomicCell::new(None));
let join = Arc::new(Join::new(panic.clone()));
let packet = Arc::new(AtomicCell::new(None));
let their_join = join.clone();
let their_packet = packet.clone();
let subscriber = EventSubscriber {
resource: &DONE as &dyn EventSource as *const _ as *mut dyn EventSource,
};
let closure = move || {
their_packet.swap(Some(f()));
their_join.trigger();
subscriber
};
let mut co = if let Some(mut c) = _co {
c.init_code(closure);
c
} else {
Gn::new_opt(stack_size, closure)
};
let handle = Coroutine::new(name, stack_size);
let local = CoroutineLocal::new(handle.clone(), join.clone());
co.set_local_data(Box::into_raw(local) as *mut u8);
(co, make_join_handle(handle, join, packet, panic))
}
pub fn spawn<F, T>(self, f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
return self.spawn_local(f);
}
pub fn spawn_local<F, T>(self, f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (co, handle) = self.spawn_impl(f);
run_coroutine(co);
handle
}
}
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(f)
}
#[inline]
pub fn current() -> Coroutine {
match get_co_local_data() {
None => panic!("no current coroutine, did you call `current()` in thread context?"),
Some(local) => unsafe { local.as_ref() }.get_co().clone(),
}
}
#[inline]
pub fn try_current() -> Result<Coroutine, crate::std::errors::Error> {
match get_co_local_data() {
None => Err(err!(
"no current coroutine, did you call `current()` in thread context?"
)),
Some(local) => Ok(unsafe { local.as_ref() }.get_co().clone()),
}
}
#[inline]
pub fn is_coroutine() -> bool {
get_co_local_data().is_some()
}
#[inline]
pub(crate) fn current_cancel_data() -> &'static Cancel {
match get_co_local_data() {
None => panic!("no cancel data, did you call `current_cancel_data()` in thread context?"),
Some(local) => &(unsafe { &*local.as_ptr() }.get_co().inner.cancel),
}
}
#[inline]
pub(crate) fn co_cancel_data(co: &CoroutineImpl) -> &'static Cancel {
let local = unsafe { &*get_co_local(co) };
&local.get_co().inner.cancel
}
#[cfg(unix)]
pub(crate) fn co_get_handle(co: &CoroutineImpl) -> Coroutine {
let local = unsafe { &*get_co_local(co) };
local.get_co().clone()
}
#[inline]
fn park_timeout_impl(dur: Option<Duration>) {
if !is_coroutine() {
return;
}
let co_handle = current();
co_handle.inner.park.park_timeout(dur).ok();
}
pub fn park() {
park_timeout_impl(None);
}
pub fn park_timeout(dur: Duration) {
park_timeout_impl(Some(dur));
}
#[inline]
pub(crate) fn run_coroutine(mut co: CoroutineImpl) {
match co.resume() {
Some(ev) => ev.subscribe(co),
None => {
let local = unsafe { &mut *get_co_local(&co) };
let join = local.get_join();
if let Some(panic) = co.get_panic_data() {
join.set_panic_data(panic);
}
join.trigger();
Done::drop_coroutine(co);
}
}
}