use slab;
use num_cpus;
use thread_scoped;
use std::any::Any;
use std::cell::RefCell;
use std::rc::Rc;
use std::mem;
use std;
use mio_orig::{self, Token, EventLoop, EventLoopConfig};
use mio_orig::Handler as MioHandler;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use self::sync::mpsc;
use std::ptr;
use self::timer::Timer;
use owning_ref::{ArcRef, ErasedArcRef};
macro_rules! thread_trace_fmt_prefix {
() => ("T{}: ")
}
macro_rules! co_trace_fmt_prefix {
() => ("T{}: C{}: ")
}
macro_rules! thread_debug {
(target: $target:expr, $thread:expr, $fmt:tt, $($arg:tt)*) => (
debug!(target: $target,
concat!(thread_trace_fmt_prefix!(), $fmt),
$thread.thread_id(),
$($arg)*
);
);
($thread:expr, $fmt:tt, $($arg:tt)*) => (
debug!(
concat!(thread_trace_fmt_prefix!(), $fmt),
$thread.thread_id(),
$($arg)*
);
);
($co:expr, $fmt:tt) => (
thread_debug!($co, $fmt,)
);
}
macro_rules! co_debug {
(target: $target:expr, $co:expr, $fmt:tt, $($arg:tt)*) => (
debug!(target: $target,
concat!(co_trace_fmt_prefix!(), $fmt),
$co.handler_shared().thread_id(),
$co.id.as_usize(),
$($arg)*
);
);
($co:expr, $fmt:tt, $($arg:tt)*) => (
debug!(
concat!(co_trace_fmt_prefix!(), $fmt),
$co.handler_shared().thread_id(),
$co.id.as_usize(),
$($arg)*
);
);
($co:expr, $fmt:tt) => (
co_debug!($co, $fmt,)
);
}
pub mod sync;
pub mod timer;
#[cfg(not(windows))]
pub mod unix;
pub mod tcp;
pub mod udp;
pub use self::evented::{Evented, MioAdapter, EventedImpl, RcEventSource, EventSourceTrait};
mod evented;
use self::coroutine::{Coroutine, RcCoroutine};
mod coroutine;
pub use self::thread::Handler;
use self::thread::Message;
use self::thread::{tl_current_coroutine, tl_current_coroutine_ptr};
mod thread;
mod thunk;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct RW {
read: bool,
write: bool,
}
impl RW {
pub fn read() -> Self {
RW {
read: true,
write: false,
}
}
pub fn write() -> Self {
RW {
read: false,
write: true,
}
}
pub fn both() -> Self {
RW {
read: true,
write: true,
}
}
fn none() -> Self {
RW {
read: false,
write: false,
}
}
fn has_read(&self) -> bool {
self.read
}
fn has_write(&self) -> bool {
self.write
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Event {
id: EventSourceId,
rw: RW,
}
impl Event {
pub fn id(&self) -> EventSourceId {
self.id
}
pub fn has_read(&self) -> bool {
self.rw.has_read()
}
pub fn has_write(&self) -> bool {
self.rw.has_write()
}
}
fn sender_retry<M: Send>(sender: &mio_orig::Sender<M>, msg: M) {
let mut msg = Some(msg);
let mut warning_printed = false;
let mut counter = 0;
loop {
match sender.send(msg.take().expect("sender_retry")) {
Ok(()) => break,
Err(mio_orig::NotifyError::Closed(_)) => panic!("Closed channel on sender.send()."),
Err(mio_orig::NotifyError::Io(_)) => panic!("IO error on sender.send()."),
Err(mio_orig::NotifyError::Full(retry_msg)) => {
counter += 1;
msg = Some(retry_msg);
}
}
if counter > 20000 {
panic!("Mio Queue Full, process hangs. consider increasing \
`EventLoopConfig::notify_capacity");
}
if !warning_printed {
warning_printed = true;
warn!("send_retry: retry; consider increasing `EventLoopConfig::notify_capacity`");
}
std::thread::yield_now();
}
}
const EVENT_SOURCE_TOKEN_SHIFT: usize = 10;
const EVENT_SOURCE_TOKEN_MASK: usize = (1 << EVENT_SOURCE_TOKEN_SHIFT) - 1;
fn token_to_ids(token: Token) -> (coroutine::Id, EventSourceId) {
let val = token.as_usize();
(coroutine::Id::new(val >> EVENT_SOURCE_TOKEN_SHIFT),
EventSourceId(val & EVENT_SOURCE_TOKEN_MASK))
}
fn token_from_ids(co_id: coroutine::Id, io_id: EventSourceId) -> Token {
debug_assert!(io_id.as_usize() <= EVENT_SOURCE_TOKEN_MASK);
Token((co_id.as_usize() << EVENT_SOURCE_TOKEN_SHIFT) | io_id.as_usize())
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct EventSourceId(usize);
impl EventSourceId {
fn new(id: usize) -> Self {
EventSourceId(id)
}
fn as_usize(&self) -> usize {
self.0
}
}
impl slab::Index for EventSourceId {
fn as_usize(&self) -> usize {
self.0
}
fn from_usize(i: usize) -> Self {
EventSourceId(i)
}
}
pub trait Scheduler: Sync + Send {
fn spawn_thread(&self) -> Box<SchedulerThread + 'static>;
}
pub trait SchedulerThread {
fn spawned(&mut self,
event_loop: &mut mio_orig::EventLoop<thread::Handler>,
coroutine_ctrl: CoroutineControl);
fn ready(&mut self,
event_loop: &mut mio_orig::EventLoop<thread::Handler>,
coroutine_ctrl: CoroutineControl);
fn tick(&mut self, _event_loop: &mut mio_orig::EventLoop<thread::Handler>) {}
fn timeout(&mut self) -> Option<u64> {
None
}
}
struct FifoScheduler {
thread_num: Arc<AtomicUsize>,
}
impl FifoScheduler {
pub fn new() -> Self {
FifoScheduler { thread_num: Arc::new(AtomicUsize::new(0)) }
}
}
impl Default for FifoScheduler {
fn default() -> Self {
FifoScheduler::new()
}
}
struct FifoSchedulerThread {
thread_id: usize,
thread_i: usize,
thread_num: Arc<AtomicUsize>,
delayed: VecDeque<CoroutineControl>,
}
impl Scheduler for FifoScheduler {
fn spawn_thread(&self) -> Box<SchedulerThread> {
let prev = self.thread_num.fetch_add(1, Ordering::Relaxed);
Box::new(FifoSchedulerThread {
thread_id: prev,
thread_i: prev,
thread_num: self.thread_num.clone(),
delayed: VecDeque::new(),
})
}
}
impl FifoSchedulerThread {
fn thread_next_i(&mut self) -> usize {
let ret = self.thread_i;
self.thread_i += 1;
if self.thread_i >= self.thread_num() {
self.thread_i = 0;
}
ret
}
fn thread_num(&self) -> usize {
self.thread_num.load(Ordering::Relaxed)
}
}
impl SchedulerThread for FifoSchedulerThread {
fn spawned(&mut self,
event_loop: &mut mio_orig::EventLoop<thread::Handler>,
coroutine_ctrl: CoroutineControl) {
let thread_i = self.thread_next_i();
if thread_i == self.thread_id {
coroutine_ctrl.resume(event_loop);
} else {
coroutine_ctrl.migrate(event_loop, thread_i);
}
}
fn ready(&mut self,
event_loop: &mut mio_orig::EventLoop<thread::Handler>,
coroutine_ctrl: CoroutineControl) {
if coroutine_ctrl.is_yielding() {
self.delayed.push_back(coroutine_ctrl);
} else {
coroutine_ctrl.resume(event_loop);
}
}
fn tick(&mut self, event_loop: &mut mio_orig::EventLoop<thread::Handler>) {
let len = self.delayed.len();
for _ in 0..len {
let coroutine_ctrl = self.delayed.pop_front().unwrap();
coroutine_ctrl.resume(event_loop);
}
}
fn timeout(&mut self) -> Option<u64> {
if self.delayed.is_empty() {
None
} else {
Some(1000)
}
}
}
pub struct CoroutineControl {
was_handled: bool,
is_yielding: bool,
rc: RcCoroutine,
}
impl Drop for CoroutineControl {
fn drop(&mut self) {
if !self.was_handled {
self.kill();
}
}
}
impl CoroutineControl {
fn new(rc: RcCoroutine) -> Self {
CoroutineControl {
is_yielding: false,
was_handled: false,
rc: rc,
}
}
pub fn resume(mut self, event_loop: &mut EventLoop<thread::Handler>) {
self.was_handled = true;
let co_rc = self.rc.clone();
debug_assert!(co_rc.borrow().state().is_ready());
coroutine::jump_in(&co_rc);
self.after_resume(event_loop);
}
fn after_resume(&self, event_loop: &mut EventLoop<thread::Handler>) {
if self.rc.borrow_mut().register_all(event_loop) {
self.rc.borrow_mut().deregister_all(event_loop)
}
self.rc.borrow_mut().start_children();
let state = self.rc.borrow().state().clone();
if state.is_yielding() {
debug_assert!(self.rc.borrow().blocked_on.is_empty());
let mut coroutine_ctrl = CoroutineControl::new(self.rc.clone());
coroutine_ctrl.set_is_yielding();
self.rc.borrow_mut().unblock_after_yield();
let rc_coroutine = self.rc.borrow();
let mut handler_shared = rc_coroutine.handler_shared_mut();
handler_shared.add_ready(coroutine_ctrl);
} else if state.is_ready() {
debug_assert!(self.rc.borrow().blocked_on.is_empty());
let coroutine_ctrl = CoroutineControl::new(self.rc.clone());
let rc_coroutine = self.rc.borrow();
let mut handler_shared = rc_coroutine.handler_shared_mut();
handler_shared.add_ready(coroutine_ctrl);
}
}
pub fn migrate(mut self, event_loop: &mut EventLoop<thread::Handler>, thread_id: usize) {
self.was_handled = true;
let sender = {
let mut co = self.rc.borrow_mut();
let handler_shared = co.detach_from(event_loop, thread_id);
let mut handler_shared = handler_shared.borrow_mut();
handler_shared.coroutines.remove(co.id).unwrap();
handler_shared.get_sender_to_thread(thread_id)
};
let rc = self.rc.clone();
drop(self);
sender_retry(&sender, Message::Migration(CoroutineControl::new(rc)));
}
pub fn reattach_to(&mut self,
event_loop: &mut EventLoop<thread::Handler>,
handler: &mut thread::Handler) {
let handler_shared = handler.shared().clone();
let id = handler_shared.borrow_mut().attach(self.rc.clone());
self.rc.borrow_mut().attach_to(event_loop, handler_shared, id);
}
fn set_is_yielding(&mut self) {
self.is_yielding = true
}
pub fn is_yielding(&self) -> bool {
self.is_yielding
}
pub fn get_userdata<T: Any>(&self) -> Option<ErasedArcRef<T>> {
let opt_arcref: Option<ArcRef<_>> = self.rc
.borrow()
.user_data
.clone()
.map(|arc| arc.into());
opt_arcref.and_then(|arcref| {
if (&***arcref.owner() as &Any).downcast_ref::<T>().is_some() {
Some(arcref.map(|ud| (&**ud as &Any).downcast_ref::<T>().unwrap())
.erase_owner())
} else {
None
}
})
}
}
pub struct Mioco {
join_handles: Vec<std::thread::JoinHandle<()>>,
config: Config,
}
impl Mioco {
pub fn new() -> Self {
Mioco::new_configured(Config::new())
}
pub fn new_configured(config: Config) -> Self {
Mioco {
join_handles: Vec::new(),
config: config,
}
}
pub fn start<F, T>(&mut self, f: F) -> std::thread::Result<T>
where F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
{
let (sender, receiver) = sync::mpsc::channel();
self.run(f, sender);
let join = JoinHandle { receiver: receiver };
join.join()
}
fn run<F, T>(&mut self, f: F, co_exit_sender: coroutine::ExitSender<T>)
where F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
{
info!("starting instance with {} threads", self.config.thread_num);
let thread_shared = Arc::new(thread::HandlerThreadShared::new(self.config.thread_num));
let mut event_loops = VecDeque::new();
let mut senders = Vec::new();
for _ in 0..self.config.thread_num {
let event_loop = EventLoop::configured(self.config.event_loop_config.clone())
.expect("new EventLoop");
senders.push(event_loop.channel());
event_loops.push_back(event_loop);
}
let sched = self.config.scheduler.spawn_thread();
let first_event_loop = event_loops.pop_front().unwrap();
for i in 1..self.config.thread_num {
let scheduler = self.config.scheduler.clone();
let coroutine_config = self.config.coroutine_config;
let event_loop = event_loops.pop_front().unwrap();
let senders = senders.clone();
let thread_shared = thread_shared.clone();
let join = std::thread::Builder::new()
.name(format!("mioco_thread_{}", i))
.spawn(move || {
let sched = scheduler.spawn_thread();
Mioco::thread_loop::<F, T>(None,
sched,
event_loop,
i,
senders,
thread_shared,
None,
coroutine_config);
});
match join {
Ok(join) => self.join_handles.push(join),
Err(err) => panic!("Couldn't spawn thread: {}", err),
}
}
let mut user_data = None;
mem::swap(&mut user_data, &mut self.config.user_data);
Mioco::thread_loop(Some((f, co_exit_sender)),
sched,
first_event_loop,
0,
senders,
thread_shared,
user_data,
self.config.coroutine_config);
for join in self.join_handles.drain(..) {
let _ = join.join(); }
}
fn thread_loop<F, T>(f_and_sender: Option<(F, coroutine::ExitSender<T>)>,
scheduler: Box<SchedulerThread + 'static>,
mut event_loop: EventLoop<thread::Handler>,
thread_id: usize,
senders: Vec<thread::MioSender>,
thread_shared: thread::ArcHandlerThreadShared,
userdata: Option<coroutine::UserDataAny>,
coroutine_config: coroutine::Config)
where F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
{
let handler_shared = thread::HandlerShared::new(senders,
thread_shared,
coroutine_config,
thread_id);
let shared = Rc::new(RefCell::new(handler_shared));
if let Some((f, exit_sender)) = f_and_sender {
let coroutine_rc = Coroutine::spawn(shared.clone(), userdata, f, exit_sender);
shared.borrow().signal_start_all();
shared.borrow_mut().add_spawned(CoroutineControl::new(coroutine_rc));
}
let mut handler = thread::Handler::new(shared, scheduler);
handler.shared().borrow().wait_for_start_all();
{
let sh = handler.shared().borrow();
thread_debug!(sh, "event loop: starting");
}
handler.tick(&mut event_loop);
event_loop.run(&mut handler).unwrap();
{
let sh = handler.shared().borrow();
thread_debug!(sh, "event loop: done");
}
}
}
impl Default for Mioco {
fn default() -> Self {
Mioco::new()
}
}
pub struct Config {
thread_num: usize,
scheduler: Arc<Box<Scheduler>>,
event_loop_config: EventLoopConfig,
user_data: Option<coroutine::UserDataAny>,
coroutine_config: coroutine::Config,
}
impl Config {
pub fn new() -> Self {
Config {
thread_num: num_cpus::get(),
scheduler: Arc::new(Box::new(FifoScheduler::new())),
event_loop_config: Default::default(),
user_data: None,
coroutine_config: Default::default(),
}
}
pub fn set_thread_num(&mut self, thread_num: usize) -> &mut Self {
self.thread_num = thread_num;
self
}
pub fn set_scheduler(&mut self, scheduler: Box<Scheduler + 'static>) -> &mut Self {
self.scheduler = Arc::new(scheduler);
self
}
pub unsafe fn set_stack_size(&mut self, stack_size: usize) -> &mut Self {
self.coroutine_config.stack_size = stack_size;
self
}
pub fn set_userdata<T: Any + Send + Sync>(&mut self, data: T) -> &mut Self {
self.user_data = Some(Arc::new(Box::new(data)));
self
}
pub fn event_loop(&mut self) -> &mut EventLoopConfig {
&mut self.event_loop_config
}
pub fn set_catch_panics(&mut self, catch_panics: bool) -> &mut Self {
self.coroutine_config.catch_panics = catch_panics;
self
}
pub unsafe fn set_stack_protection(&mut self, stack_protection: bool) -> &mut Self {
self.coroutine_config.stack_protection = stack_protection;
self
}
}
impl Default for Config {
fn default() -> Self {
Config::new()
}
}
pub fn start<F, T>(f: F) -> std::thread::Result<T>
where F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
{
Mioco::new().start(f)
}
pub fn start_threads<F, T>(thread_num: usize, f: F) -> std::thread::Result<T>
where F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
{
let mut config = Config::new();
config.set_thread_num(thread_num);
Mioco::new_configured(config).start(f)
}
pub struct JoinHandle<T> {
receiver: sync::mpsc::Receiver<coroutine::ExitStatus<T>>,
}
impl<T> JoinHandle<T>
where T: Send + 'static
{
pub fn join(self) -> std::thread::Result<T> {
match self.receiver.recv() {
Ok(t) => t,
Err(err) => Err(Box::new(err)),
}
}
}
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
{
let coroutine = tl_current_coroutine_ptr();
let (sender, receiver) = sync::mpsc::channel();
if coroutine == ptr::null_mut() {
std::thread::spawn(|| {
Mioco::new().run(f, sender);
});
} else {
let coroutine = unsafe { tl_current_coroutine() };
coroutine.spawn_child(f, sender)
}
JoinHandle { receiver: receiver }
}
pub fn shutdown() -> ! {
let coroutine = unsafe { tl_current_coroutine() };
{
let shared = coroutine.handler_shared();
shared.broadcast_shutdown();
}
loop {
yield_now();
}
}
pub fn in_coroutine() -> bool {
let coroutine = tl_current_coroutine_ptr();
coroutine != ptr::null_mut()
}
pub fn offload<'b, F, R>(f: F) -> R
where F: FnOnce() -> R + 'b,
F: Send,
R: Send
{
let coroutine = unsafe { tl_current_coroutine() };
if coroutine.sync_channel.is_none() {
let (send, recv) = mpsc::channel();
coroutine.sync_channel = Some((send, recv));
}
let &(ref tx, ref rx) = coroutine.sync_channel.as_ref().unwrap();
let join = unsafe {
thread_scoped::scoped({
let sender = tx.clone();
move || {
let res = f();
sender.send(()).unwrap();
res
}
})
};
rx.recv().unwrap();
join.join()
}
pub fn get_userdata<T: Any>() -> Option<ErasedArcRef<T>> {
let coroutine = unsafe { tl_current_coroutine() };
let opt_arcref: Option<ArcRef<_>> = coroutine.user_data.clone().map(|arc| arc.into());
opt_arcref.and_then(|arcref| {
if (&***arcref.owner() as &Any).downcast_ref::<T>().is_some() {
Some(arcref.map(|ud| (&**ud as &Any).downcast_ref::<T>().unwrap())
.erase_owner())
} else {
None
}
})
}
pub fn set_userdata<T: Any + Send + Sync>(data: T) {
let mut coroutine = unsafe { tl_current_coroutine() };
coroutine.user_data = Some(Arc::new(Box::new(data)));
}
pub fn set_children_userdata<T: Any + Send + Sync>(data: Option<T>) {
let mut coroutine = unsafe { tl_current_coroutine() };
coroutine.inherited_user_data = match data {
Some(data) => Some(Arc::new(Box::new(data))),
None => None,
}
}
pub fn thread_num() -> usize {
let coroutine = unsafe { tl_current_coroutine() };
coroutine.handler_shared().thread_num()
}
pub fn sleep(duration: std::time::Duration) {
if in_coroutine() {
let mut timer = Timer::new();
let dur_ms: u64 = duration.as_secs() * 1000 + duration.subsec_nanos() as u64 / 1_000_000;
timer.set_timeout(dur_ms);
let _ = timer.read();
} else {
std::thread::sleep(duration);
}
}
pub fn sleep_ms(time_ms: u64) {
if in_coroutine() {
let mut timer = Timer::new();
timer.set_timeout(time_ms);
let _ = timer.read();
} else {
std::thread::sleep(std::time::Duration::from_millis(time_ms));
}
}
pub fn yield_now() {
let coroutine = unsafe { tl_current_coroutine() };
coroutine.state = coroutine::State::Yielding;
co_debug!(coroutine, "yield");
coroutine::jump_out(&coroutine.self_rc.as_ref().unwrap());
}
pub fn select_wait() -> Event {
let coroutine = unsafe { tl_current_coroutine() };
coroutine.state = coroutine::State::Blocked;
co_debug!(coroutine, "blocked on select");
coroutine::jump_out(&coroutine.self_rc.as_ref().unwrap());
co_debug!(coroutine, "select ret={:?}", coroutine.last_event);
coroutine.last_event
}
#[macro_export]
macro_rules! select {
(@wrap1 ) => {};
(@wrap1 r:$rx:expr => $code:expr, $($tail:tt)*) => {
unsafe {
use $crate::Evented;
$rx.select_add($crate::RW::read());
}
select!(@wrap1 $($tail)*)
};
(@wrap1 r:$rx:expr => $code:expr) => {
unsafe {
use $crate::Evented;
$rx.select_add($crate::RW::read());
}
};
(@wrap1 w:$rx:expr => $code:expr, $($tail:tt)*) => {
unsafe {
use $crate::Evented;
$rx.select_add($crate::RW::write());
}
select!(@wrap1 $($tail)*)
};
(@wrap1 w:$rx:expr => $code:expr) => {
unsafe {
use $crate::Evented;
$rx.select_add($crate::RW::write());
}
};
(@wrap1 rw:$rx:expr => $code:expr, $($tail:tt)*) => {
unsafe {
use $crate::Evented;
$rx.select_add($crate::RW::both());
}
select!(@wrap1 $($tail)*)
};
(@wrap1 rw:$rx:expr => $code:expr) => {
unsafe {
use $crate::Evented;
$rx.select_add($crate::RW::both());
}
};
(@wrap2 $ret:ident) => {
};
(@wrap2 $ret:ident r:$rx:expr => $code:expr, $($tail:tt)*) => {{
use $crate::Evented;
if $ret.id() == $rx.id() { $code }
select!(@wrap2 $ret $($tail)*);
}};
(@wrap2 $ret:ident r:$rx:expr => $code:expr) => {{
use $crate::Evented;
if $ret.id() == $rx.id() { $code }
}};
(@wrap2 $ret:ident w:$rx:expr => $code:expr, $($tail:tt)*) => {{
use $crate::Evented;
if $ret.id() == $rx.id() { $code }
select!(@wrap2 $ret $($tail)*);
}};
(@wrap2 $ret:ident w:$rx:expr => $code:expr) => {{
use $crate::Evented;
if $ret.id() == $rx.id() { $code }
}};
(@wrap2 $ret:ident rw:$rx:expr => $code:expr, $($tail:tt)*) => {{
use $crate::Evented;
if $ret.id() == $rx.id() { $code }
select!(@wrap2 $ret $($tail)*);
}};
(@wrap2 $ret:ident rw:$rx:expr => $code:expr) => {{
use $crate::Evented;
if $ret.id() == $rx.id() { $code }
}};
($($tail:tt)*) => {{
select!(@wrap1 $($tail)*);
let ret = mioco::select_wait();
select!(@wrap2 ret $($tail)*);
}};
}