use crate::common::{Current, JoinHandle, Named};
use crate::constants::{CoroutineState, SyscallState};
use crate::coroutine::suspender::{Suspender, SuspenderImpl};
use crate::coroutine::{Coroutine, CoroutineImpl, SimpleCoroutine, StateCoroutine};
use crate::scheduler::join::JoinHandleImpl;
use crate::scheduler::listener::Listener;
use dashmap::DashMap;
use once_cell::sync::Lazy;
use open_coroutine_queue::LocalQueue;
use open_coroutine_timer::TimerList;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::panic::UnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use uuid::Uuid;
pub type SchedulableCoroutine<'s> = CoroutineImpl<'s, (), (), Option<usize>>;
pub type SchedulableSuspender<'s> = SuspenderImpl<'s, (), ()>;
pub mod listener;
pub mod join;
mod current;
pub mod has;
#[cfg(test)]
mod tests;
pub trait Scheduler<'s, Join: JoinHandle<Self>>:
Debug + Default + Named + Current<'s> + Listener
{
fn get_stack_size(&self) -> usize;
fn set_stack_size(&self, stack_size: usize);
fn submit_co(
&self,
f: impl FnOnce(&dyn Suspender<Resume = (), Yield = ()>, ()) -> Option<usize>
+ UnwindSafe
+ 'static,
stack_size: Option<usize>,
) -> std::io::Result<Join> {
let coroutine = SchedulableCoroutine::new(
format!("{}|{}", self.get_name(), Uuid::new_v4()),
f,
stack_size.unwrap_or(self.get_stack_size()),
)?;
let co_name = Box::leak(Box::from(coroutine.get_name()));
self.submit_raw_co(coroutine)?;
Ok(Join::new(self, co_name))
}
fn submit_raw_co(&self, coroutine: SchedulableCoroutine<'static>) -> std::io::Result<()>;
fn try_resume(&self, co_name: &str) -> std::io::Result<()>;
fn try_schedule(&self) -> std::io::Result<()> {
self.try_timeout_schedule(std::time::Duration::MAX.as_secs())
.map(|_| ())
}
fn try_timed_schedule(&self, dur: std::time::Duration) -> std::io::Result<u64> {
self.try_timeout_schedule(open_coroutine_timer::get_timeout_time(dur))
}
fn try_timeout_schedule(&self, timeout_time: u64) -> std::io::Result<u64>;
fn try_get_co_result(&self, co_name: &str) -> Option<Result<Option<usize>, &'s str>>;
fn is_empty(&self) -> bool {
self.size() == 0
}
fn size(&self) -> usize;
fn add_listener(&mut self, listener: impl Listener + 's);
}
static mut SUSPEND_TABLE: Lazy<TimerList<SchedulableCoroutine>> = Lazy::new(TimerList::default);
static mut SYSTEM_CALL_TABLE: Lazy<HashMap<&str, SchedulableCoroutine>> = Lazy::new(HashMap::new);
static mut SYSTEM_CALL_SUSPEND_TABLE: Lazy<TimerList<&str>> = Lazy::new(TimerList::default);
#[repr(C)]
#[derive(Debug)]
pub struct SchedulerImpl<'s> {
name: String,
scheduling: AtomicBool,
stack_size: AtomicUsize,
ready: LocalQueue<'s, SchedulableCoroutine<'static>>,
results: DashMap<&'s str, Result<Option<usize>, &'s str>>,
listeners: VecDeque<Box<dyn Listener + 's>>,
}
impl<'s> SchedulerImpl<'s> {
#[must_use]
pub fn new(name: String, stack_size: usize) -> Self {
let mut scheduler = SchedulerImpl {
name,
scheduling: AtomicBool::new(false),
stack_size: AtomicUsize::new(stack_size),
ready: LocalQueue::default(),
results: DashMap::default(),
listeners: VecDeque::default(),
};
scheduler.init();
scheduler
}
fn init(&mut self) {
#[cfg(all(unix, feature = "preemptive-schedule"))]
self.add_listener(crate::monitor::creator::MonitorTaskCreator::default());
}
fn check_ready(&self) -> std::io::Result<()> {
unsafe {
for _ in 0..SUSPEND_TABLE.len() {
if let Some((exec_time, _)) = SUSPEND_TABLE.front() {
if open_coroutine_timer::now() < *exec_time {
break;
}
if let Some((_, mut entry)) = SUSPEND_TABLE.pop_front() {
for _ in 0..entry.len() {
if let Some(coroutine) = entry.pop_front() {
coroutine.ready()?;
self.ready.push_back(coroutine);
}
}
}
}
}
for _ in 0..SYSTEM_CALL_SUSPEND_TABLE.entry_len() {
if let Some((exec_time, _)) = SYSTEM_CALL_SUSPEND_TABLE.front() {
if open_coroutine_timer::now() < *exec_time {
break;
}
if let Some((_, mut entry)) = SYSTEM_CALL_SUSPEND_TABLE.pop_front() {
while let Some(co_name) = entry.pop_front() {
if let Some(coroutine) = SYSTEM_CALL_TABLE.remove(&co_name) {
match coroutine.state() {
CoroutineState::SystemCall(val, syscall, state) => {
if let SyscallState::Suspend(_) = state {
coroutine.syscall(
val,
syscall,
SyscallState::Timeout,
)?;
}
self.ready.push_back(coroutine);
}
_ => {
unreachable!("check_ready should never execute to here")
}
}
}
}
}
}
}
}
Ok(())
}
}
impl Default for SchedulerImpl<'_> {
fn default() -> Self {
Self::new(
format!("open-coroutine-scheduler-{}", Uuid::new_v4()),
crate::constants::DEFAULT_STACK_SIZE,
)
}
}
impl Drop for SchedulerImpl<'_> {
fn drop(&mut self) {
if !std::thread::panicking() {
assert!(
self.ready.is_empty(),
"There are still coroutines to be carried out in the ready queue:{:#?} !",
self.ready
);
}
}
}
impl Eq for SchedulerImpl<'_> {}
impl PartialEq for SchedulerImpl<'_> {
fn eq(&self, other: &Self) -> bool {
self.get_name().eq(other.get_name())
}
}
impl Named for SchedulerImpl<'_> {
fn get_name(&self) -> &str {
&self.name
}
}
impl<'s> Scheduler<'s, JoinHandleImpl<'s>> for SchedulerImpl<'s> {
fn get_stack_size(&self) -> usize {
self.stack_size.load(Ordering::Acquire)
}
fn set_stack_size(&self, stack_size: usize) {
self.stack_size.store(stack_size, Ordering::Release);
}
fn submit_raw_co(&self, coroutine: SchedulableCoroutine<'static>) -> std::io::Result<()> {
coroutine.ready()?;
self.on_create(&coroutine);
self.ready.push_back(coroutine);
Ok(())
}
fn try_resume(&self, co_name: &str) -> std::io::Result<()> {
let co_name: &str = Box::leak(Box::from(co_name));
unsafe {
if let Some(coroutine) = SYSTEM_CALL_TABLE.remove(co_name) {
self.ready.push_back(coroutine);
}
}
Ok(())
}
fn try_timeout_schedule(&self, timeout_time: u64) -> std::io::Result<u64> {
if self
.scheduling
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
return Ok(timeout_time.saturating_sub(open_coroutine_timer::now()));
}
Self::init_current(self);
self.on_schedule(timeout_time);
loop {
let left_time = timeout_time.saturating_sub(open_coroutine_timer::now());
if left_time == 0 {
Self::clean_current();
self.scheduling.store(false, Ordering::Release);
return Ok(0);
}
if let Err(e) = self.check_ready() {
Self::clean_current();
self.scheduling.store(false, Ordering::Release);
return Err(e);
}
if let Some(mut coroutine) = self.ready.pop_front() {
self.on_resume(timeout_time, &coroutine);
match coroutine.resume() {
Ok(state) => match state {
CoroutineState::SystemCall((), syscall, state) => {
self.on_syscall(timeout_time, &coroutine, syscall, state);
let co_name = Box::leak(Box::from(coroutine.get_name()));
unsafe {
_ = SYSTEM_CALL_TABLE.insert(co_name, coroutine);
if let SyscallState::Suspend(timestamp) = state {
SYSTEM_CALL_SUSPEND_TABLE.insert(timestamp, co_name);
}
}
}
CoroutineState::Suspend((), timestamp) => {
self.on_suspend(timeout_time, &coroutine);
if timestamp > open_coroutine_timer::now() {
unsafe { SUSPEND_TABLE.insert(timestamp, coroutine) };
} else {
self.ready.push_back(coroutine);
}
}
CoroutineState::Complete(result) => {
self.on_complete(timeout_time, &coroutine, result);
let co_name = Box::leak(Box::from(coroutine.get_name()));
assert!(
self.results.insert(co_name, Ok(result)).is_none(),
"not consume result"
);
}
CoroutineState::Error(message) => {
self.on_error(timeout_time, &coroutine, message);
let co_name = Box::leak(Box::from(coroutine.get_name()));
assert!(
self.results.insert(co_name, Err(message)).is_none(),
"not consume result"
);
}
_ => {
Self::clean_current();
self.scheduling.store(false, Ordering::Release);
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"try_timeout_schedule should never execute to here",
));
}
},
Err(e) => {
Self::clean_current();
self.scheduling.store(false, Ordering::Release);
return Err(e);
}
};
} else {
Self::clean_current();
self.scheduling.store(false, Ordering::Release);
return Ok(left_time);
}
}
}
fn try_get_co_result(&self, co_name: &str) -> Option<Result<Option<usize>, &'s str>> {
self.results.remove(co_name).map(|r| r.1)
}
fn size(&self) -> usize {
self.ready.len() + unsafe { SUSPEND_TABLE.len() + SYSTEM_CALL_TABLE.len() }
}
fn add_listener(&mut self, listener: impl Listener + 's) {
self.listeners.push_back(Box::new(listener));
}
}