use super::{RW, EventSourceId, coroutine};
use super::thread::Handler;
use super::thread::tl_current_coroutine;
use super::token_from_ids;
use mio_orig;
use mio_orig::{EventLoop, Token, EventSet};
use std::io;
use std::rc::Rc;
use std::cell::{RefCell, Ref, RefMut};
#[cfg(not(windows))]
use std::os::unix::io::{RawFd, FromRawFd, AsRawFd};
pub trait Evented {
unsafe fn select_add(&self, rw: RW);
#[doc(hidden)]
fn block_on(&self, rw: RW);
#[doc(hidden)]
fn id(&self) -> EventSourceId;
}
pub trait EventedImpl {
type Raw: EventSourceTrait + 'static;
fn shared(&self) -> &RcEventSource<Self::Raw>;
#[doc(hidden)]
fn block_on_prv(&self, rw: RW) {
let coroutine = unsafe { tl_current_coroutine() };
coroutine.block_on(self.shared(), rw);
coroutine::jump_out(&coroutine.self_rc.as_ref().unwrap());
co_debug!(coroutine, "resumed due to event {:?}", coroutine.last_event);
}
#[doc(hidden)]
fn id_prv(&self) -> EventSourceId {
self.shared().0.borrow().common.id.unwrap()
}
#[doc(hidden)]
unsafe fn select_add_prv(&self, rw: RW) {
let coroutine = tl_current_coroutine();
let id = coroutine.blocked_on.len();
let shared = self.shared();
{
let mut common = &mut shared.0.borrow_mut().common;
common.id = Some(EventSourceId::new(id));
common.blocked_on = rw;
}
coroutine.blocked_on.push(shared.to_trait());
}
}
unsafe impl<T> Send for MioAdapter<T> where T: mio_orig::Evented + Send {}
pub struct MioAdapter<MT>(RcEventSource<MT>);
impl<MT> MioAdapter<MT>
where MT: mio_orig::Evented + 'static
{
pub fn new(mio_type: MT) -> Self {
MioAdapter(RcEventSource::new(mio_type))
}
}
impl<MT> EventedImpl for MioAdapter<MT>
where MT: mio_orig::Evented + 'static
{
type Raw = MT;
fn shared(&self) -> &RcEventSource<Self::Raw> {
&self.0
}
}
impl<MT> MioAdapter<MT>
where MT: mio_orig::Evented + mio_orig::TryRead + 'static
{
pub fn try_read(&mut self, buf: &mut [u8]) -> io::Result<Option<usize>> {
self.shared().io_mut().try_read(buf)
}
}
impl<MT> io::Read for MioAdapter<MT>
where MT: mio_orig::Evented + mio_orig::TryRead + 'static
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
let res = self.try_read(buf);
match res {
Ok(None) => self.block_on(RW::read()),
Ok(Some(r)) => {
return Ok(r);
}
Err(e) => return Err(e),
}
}
}
}
impl<MT> MioAdapter<MT>
where MT: mio_orig::Evented + 'static + mio_orig::TryWrite
{
pub fn try_write(&self, buf: &[u8]) -> io::Result<Option<usize>> {
self.shared().io_mut().try_write(buf)
}
}
impl<MT> io::Write for MioAdapter<MT>
where MT: mio_orig::Evented + 'static + mio_orig::TryWrite
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
loop {
let res = self.try_write(buf);
match res {
Ok(None) => self.block_on(RW::write()),
Ok(Some(r)) => {
return Ok(r);
}
Err(e) => return Err(e),
}
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<MT, O> MioAdapter<MT>
where MT: mio_orig::Evented + 'static + mio_orig::TryAccept<Output = O>,
O: mio_orig::Evented + 'static
{
pub fn try_accept(&self) -> io::Result<Option<MioAdapter<O>>> {
self.shared()
.io_ref()
.accept()
.map(|t| t.map(MioAdapter::new))
}
}
impl<MT, O> MioAdapter<MT>
where MT: mio_orig::Evented + 'static + mio_orig::TryAccept<Output = O>,
O: mio_orig::Evented + 'static
{
pub fn accept(&self) -> io::Result<MioAdapter<O>> {
loop {
let res = self.try_accept();
match res {
Ok(None) => self.block_on(RW::read()),
Ok(Some(r)) => {
return Ok(r);
}
Err(e) => return Err(e),
}
}
}
}
#[cfg(not(windows))]
impl<MT> FromRawFd for MioAdapter<MT>
where MT: mio_orig::Evented + 'static + FromRawFd
{
unsafe fn from_raw_fd(fd: RawFd) -> Self {
MioAdapter(RcEventSource::new(MT::from_raw_fd(fd)))
}
}
#[cfg(not(windows))]
impl<MT> AsRawFd for MioAdapter<MT>
where MT: mio_orig::Evented + 'static + AsRawFd
{
fn as_raw_fd(&self) -> RawFd {
self.shared().0.borrow_mut().io.as_raw_fd()
}
}
impl<R, EP> Evented for EP
where EP: EventedImpl<Raw = R>,
R: EventSourceTrait + 'static
{
unsafe fn select_add(&self, rw: RW) {
EventedImpl::select_add_prv(self, rw)
}
fn block_on(&self, rw: RW) {
EventedImpl::block_on_prv(self, rw)
}
fn id(&self) -> EventSourceId {
EventedImpl::id_prv(self)
}
}
pub trait EventSourceTrait {
fn register(&mut self, event_loop: &mut EventLoop<Handler>, token: Token, interest: EventSet) -> bool;
fn reregister(&mut self,
event_loop: &mut EventLoop<Handler>,
token: Token,
interest: EventSet) -> bool;
fn deregister(&mut self, event_loop: &mut EventLoop<Handler>, token: Token);
}
impl<T> EventSourceTrait for T
where T: mio_orig::Evented
{
fn register(&mut self, event_loop: &mut EventLoop<Handler>, token: Token, interest: EventSet) -> bool {
event_loop.register(self, token, interest, mio_orig::PollOpt::edge())
.expect("register failed");
false
}
fn reregister(&mut self,
event_loop: &mut EventLoop<Handler>,
token: Token,
interest: EventSet) -> bool {
event_loop.reregister(self, token, interest, mio_orig::PollOpt::edge())
.expect("reregister failed");
false
}
fn deregister(&mut self, event_loop: &mut EventLoop<Handler>, _token: Token) {
event_loop.deregister(self)
.expect("deregister failed");
}
}
pub trait RcEventSourceTrait {
fn register(&mut self, event_loop: &mut EventLoop<Handler>, co_id: coroutine::Id) -> bool;
fn reregister(&mut self, event_loop: &mut EventLoop<Handler>, co_id: coroutine::Id) -> bool;
fn deregister(&mut self, event_loop: &mut EventLoop<Handler>, co_id: coroutine::Id);
fn hup(&mut self, _event_loop: &mut EventLoop<Handler>, token: Token);
fn blocked_on(&self) -> RW;
fn id(&self) -> Option<EventSourceId>;
}
pub struct EventSourceCommon {
pub id: Option<EventSourceId>,
pub blocked_on: RW,
pub peer_hup: bool,
}
pub struct RcEventSourceShared<T> {
common: EventSourceCommon,
io: T,
}
impl<T> RcEventSourceShared<T> {
pub fn new(t: T) -> Self {
RcEventSourceShared {
common: EventSourceCommon {
id: None,
blocked_on: RW::none(),
peer_hup: false,
},
io: t,
}
}
}
pub struct RcEventSource<T>(Rc<RefCell<RcEventSourceShared<T>>>);
impl<T> RcEventSource<T> {
pub fn new(t: T) -> Self {
RcEventSource(Rc::new(RefCell::new(RcEventSourceShared::new(t))))
}
pub fn io_ref(&self) -> Ref<T> {
Ref::map(self.0.borrow(), |r| &r.io)
}
pub fn io_mut(&self) -> RefMut<T> {
RefMut::map(self.0.borrow_mut(), |r| &mut r.io)
}
#[allow(unused)]
#[doc(hidden)]
pub fn common_ref(&self) -> Ref<EventSourceCommon> {
Ref::map(self.0.borrow(), |r| &r.common)
}
#[doc(hidden)]
pub fn common_mut(&self) -> RefMut<EventSourceCommon> {
RefMut::map(self.0.borrow_mut(), |r| &mut r.common)
}
}
impl<T> RcEventSourceTrait for RcEventSource<T>
where T: EventSourceTrait
{
fn blocked_on(&self) -> RW {
self.0.borrow().common.blocked_on
}
fn id(&self) -> Option<EventSourceId> {
self.0.borrow().common.id
}
fn hup(&mut self, _event_loop: &mut EventLoop<Handler>, _token: Token) {
trace!("hup");
self.0.borrow_mut().common.peer_hup = true;
}
fn register(&mut self, event_loop: &mut EventLoop<Handler>, co_id: coroutine::Id) -> bool {
let mut interest = mio_orig::EventSet::none();
if !self.0.borrow().common.peer_hup {
interest = interest | mio_orig::EventSet::hup();
if self.0.borrow().common.blocked_on.has_read() {
interest = interest | mio_orig::EventSet::readable();
}
}
if self.0.borrow().common.blocked_on.has_write() {
interest = interest | mio_orig::EventSet::writable();
}
let token = token_from_ids(co_id, self.0.borrow().common.id.unwrap());
EventSourceTrait::register(&mut self.0.borrow_mut().io, event_loop, token, interest)
}
fn reregister(&mut self, event_loop: &mut EventLoop<Handler>, co_id: coroutine::Id) -> bool {
let mut interest = mio_orig::EventSet::none();
if !self.0.borrow().common.peer_hup {
interest = interest | mio_orig::EventSet::hup();
if self.0.borrow().common.blocked_on.has_read() {
interest = interest | mio_orig::EventSet::readable();
}
}
if self.0.borrow().common.blocked_on.has_write() {
interest = interest | mio_orig::EventSet::writable();
}
let token = token_from_ids(co_id, self.0.borrow().common.id.unwrap());
EventSourceTrait::reregister(&mut self.0.borrow_mut().io, event_loop, token, interest)
}
fn deregister(&mut self, event_loop: &mut EventLoop<Handler>, co_id: coroutine::Id) {
let token = token_from_ids(co_id, self.0.borrow().common.id.unwrap());
EventSourceTrait::deregister(&mut self.0.borrow_mut().io, event_loop, token);
}
}
impl<T> RcEventSource<T>
where T: EventSourceTrait + 'static
{
#[doc(hidden)]
pub fn to_trait(&self) -> Box<RcEventSourceTrait + 'static> {
Box::new(RcEventSource(self.0.clone()))
}
}
impl<T> EventSourceTrait for RcEventSource<T>
where T: EventSourceTrait
{
fn register(&mut self, event_loop: &mut EventLoop<Handler>, token: Token, interest: EventSet) -> bool {
self.0.borrow_mut().io.register(event_loop, token, interest)
}
fn reregister(&mut self,
event_loop: &mut EventLoop<Handler>,
token: Token,
interest: EventSet) -> bool {
self.0.borrow_mut().io.reregister(event_loop, token, interest)
}
fn deregister(&mut self, event_loop: &mut EventLoop<Handler>, token: Token) {
self.0.borrow_mut().io.deregister(event_loop, token);
}
}