use crate::common::constants::CoroutineState;
use crate::common::ordered_work_steal::Ordered;
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
use crate::{impl_current_for, impl_display_by_debug, impl_for_named};
use std::collections::VecDeque;
use std::ffi::c_longlong;
use std::fmt::{Debug, Formatter};
use std::ops::Deref;
pub mod suspender;
pub mod local;
pub mod listener;
#[cfg(feature = "korosensei")]
pub use korosensei::Coroutine;
#[cfg(feature = "korosensei")]
mod korosensei;
#[macro_export]
macro_rules! co {
($name:expr, $f:expr, $size:expr, $priority:expr $(,)?) => {
$crate::coroutine::Coroutine::new($name, $f, $size, $priority)
};
($f:expr, $size:literal, $priority:literal $(,)?) => {
$crate::coroutine::Coroutine::new(None, $f, $size, Some($priority))
};
($name:expr, $f:expr, $size:expr $(,)?) => {
$crate::coroutine::Coroutine::new($name, $f, $size, None)
};
($f:expr, $size:literal $(,)?) => {
$crate::coroutine::Coroutine::new(None, $f, $size, None)
};
($name:expr, $f:expr $(,)?) => {
$crate::coroutine::Coroutine::new($name, $f, None, None)
};
($f:expr $(,)?) => {
$crate::coroutine::Coroutine::new(None, $f, None, None)
};
}
#[repr(C)]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct StackInfo {
pub stack_top: usize,
pub stack_bottom: usize,
}
mod state;
impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
pub fn name(&self) -> &str {
&self.name
}
pub fn state(&self) -> CoroutineState<Yield, Return>
where
Yield: Copy,
Return: Copy,
{
self.state.get()
}
pub fn add_listener(&mut self, listener: impl Listener<Yield, Return> + 'c) {
self.add_raw_listener(Box::leak(Box::new(listener)));
}
pub unsafe fn remaining_stack(&self) -> usize {
let current_sp = psm::stack_pointer() as usize;
current_sp - self.stack_infos_ref().back().unwrap().stack_bottom
}
pub fn stack_infos(&self) -> VecDeque<StackInfo> {
self.stack_infos_ref().clone()
}
pub fn stack_ptr_in_bounds(&self, stack_ptr: u64) -> bool {
for info in self.stack_infos_ref() {
if info.stack_bottom as u64 <= stack_ptr && stack_ptr < info.stack_top as u64 {
return true;
}
}
false
}
#[inline(always)]
#[allow(clippy::inline_always)]
#[doc = include_str!("../../docs/en/scalable-stack.md")]
pub fn maybe_grow<R, F: FnOnce() -> R>(callback: F) -> std::io::Result<R> {
Self::maybe_grow_with(
crate::common::default_red_zone(),
crate::common::constants::DEFAULT_STACK_SIZE,
callback,
)
}
#[cfg(unix)]
fn setup_sigvtalrm_handler() {
use nix::sys::signal::{sigaction, SaFlags, SigAction, SigHandler, SigSet, Signal};
use std::sync::atomic::{AtomicBool, Ordering};
static CANCEL_HANDLER_INITED: AtomicBool = AtomicBool::new(false);
if CANCEL_HANDLER_INITED
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
extern "C" fn sigvtalrm_handler<Param, Yield>(_: libc::c_int) {
if let Ok(mut set) = SigSet::thread_get_mask() {
set.remove(Signal::SIGVTALRM);
set.thread_set_mask()
.expect("Failed to remove SIGVTALRM signal mask!");
if let Some(suspender) = suspender::Suspender::<Param, Yield>::current() {
suspender.cancel();
}
}
}
let mut set = SigSet::empty();
set.add(Signal::SIGVTALRM);
let sa = SigAction::new(
SigHandler::Handler(sigvtalrm_handler::<Param, Yield>),
SaFlags::SA_RESTART,
set,
);
unsafe {
_ = sigaction(Signal::SIGVTALRM, &sa).expect("install SIGVTALRM handler failed !");
}
}
}
}
impl<Yield, Return> Coroutine<'_, (), Yield, Return>
where
Yield: Debug + Copy + Eq + 'static,
Return: Debug + Copy + Eq + 'static,
{
pub fn resume(&mut self) -> std::io::Result<CoroutineState<Yield, Return>> {
self.resume_with(())
}
}
impl<Param, Yield, Return> Coroutine<'_, Param, Yield, Return>
where
Param: 'static,
Yield: Debug + Copy + Eq + 'static,
Return: Debug + Copy + Eq + 'static,
{
pub fn resume_with(&mut self, arg: Param) -> std::io::Result<CoroutineState<Yield, Return>> {
let current = self.state();
if let CoroutineState::Complete(r) = current {
return Ok(CoroutineState::Complete(r));
}
if let CoroutineState::Error(e) = current {
return Ok(CoroutineState::Error(e));
}
Self::init_current(self);
self.running()?;
#[cfg(unix)]
Self::setup_sigvtalrm_handler();
let r = self.raw_resume(arg);
Self::clean_current();
r
}
}
impl<Param, Yield, Return> Debug for Coroutine<'_, Param, Yield, Return>
where
Yield: Debug + Copy,
Return: Debug + Copy,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Coroutine")
.field("name", &self.name())
.field("state", &self.state())
.field("stack_infos", &self.stack_infos())
.field("local", &self.local)
.field("priority", &self.priority)
.finish()
}
}
impl<'c, Param, Yield, Return> Deref for Coroutine<'c, Param, Yield, Return> {
type Target = CoroutineLocal<'c>;
fn deref(&self) -> &Self::Target {
&self.local
}
}
impl<Param, Yield, Return> Ordered for Coroutine<'_, Param, Yield, Return> {
fn priority(&self) -> Option<c_longlong> {
self.priority
}
}
impl_display_by_debug!(
Coroutine<'c, Param, Yield, Return>
where
Yield: Debug + Copy,
Return: Debug + Copy
);
impl_for_named!(Coroutine<'c, Param, Yield, Return>);
impl_current_for!(COROUTINE, Coroutine<'c, Param, Yield, Return>);