use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
use std::time::Duration;
use self::io_impl::co_io_err::Error;
use self::io_impl::net as net_impl;
use crate::io as io_impl;
use crate::std::sync::atomic_dur::AtomicDuration;
use crate::yield_now::yield_with;
fn set_nonblocking<T: AsRawFd>(fd: &T, nb: bool) -> io::Result<()> {
unsafe {
let fd = fd.as_raw_fd();
let r = libc::fcntl(fd, libc::F_GETFL);
if r == -1 {
return Err(io::Error::last_os_error());
}
let r = if nb {
libc::fcntl(fd, libc::F_SETFL, r | libc::O_NONBLOCK)
} else {
libc::fcntl(fd, libc::F_SETFL, r & !libc::O_NONBLOCK)
};
if r == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
}
}
#[derive(Debug)]
pub struct CoIo<T: AsRawFd> {
inner: T,
io: io_impl::IoData,
ctx: io_impl::IoContext,
read_timeout: AtomicDuration,
write_timeout: AtomicDuration,
}
impl<T: AsRawFd> io_impl::AsIoData for CoIo<T> {
fn as_io_data(&self) -> &io_impl::IoData {
&self.io
}
}
impl<T: AsRawFd> AsRawFd for CoIo<T> {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
impl<T: AsRawFd + IntoRawFd> IntoRawFd for CoIo<T> {
fn into_raw_fd(self) -> RawFd {
self.inner.into_raw_fd()
}
}
impl<T: AsRawFd> CoIo<T> {
pub fn new(io: T) -> Result<Self, Error<T>> {
let io_data = match io_impl::add_socket(&io) {
Ok(o) => o,
Err(e) => return Err(Error::new(e, io)),
};
match set_nonblocking(&io, true) {
Ok(_) => {}
Err(e) => return Err(Error::new(e, io)),
}
Ok(CoIo {
inner: io,
io: io_data,
ctx: io_impl::IoContext::new(),
read_timeout: AtomicDuration::new(None),
write_timeout: AtomicDuration::new(None),
})
}
pub(crate) fn from_raw(io: T, io_data: io_impl::IoData) -> Self {
CoIo {
inner: io,
io: io_data,
ctx: io_impl::IoContext::new(),
read_timeout: AtomicDuration::new(None),
write_timeout: AtomicDuration::new(None),
}
}
pub(crate) fn io_reset(&self) {
self.io.reset()
}
pub(crate) fn ctx_check(&self) -> io::Result<bool> {
self.ctx.check_nonblocking(|b| set_nonblocking(self, b))?;
self.ctx.check_context(|b| set_nonblocking(self, b))
}
pub fn inner(&self) -> &T {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut T {
&mut self.inner
}
pub fn into_inner(self) -> T {
self.inner
}
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
Ok(self.read_timeout.get())
}
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
Ok(self.write_timeout.get())
}
pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.read_timeout.swap(dur);
Ok(())
}
pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.write_timeout.swap(dur);
Ok(())
}
pub fn set_nonblocking(&self, nb: bool) -> io::Result<()> {
self.ctx.set_nonblocking(nb);
Ok(())
}
}
impl<T: AsRawFd + Read> Read for CoIo<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if !self.ctx_check()? {
return self.inner.read(buf);
}
self.io.reset();
match self.inner.read(buf) {
Ok(n) => return Ok(n),
Err(e) => {
let raw_err = e.raw_os_error();
if raw_err == Some(libc::EAGAIN) || raw_err == Some(libc::EWOULDBLOCK) {
} else {
return Err(e);
}
}
}
let mut reader = net_impl::SocketRead::new(self, buf, self.read_timeout.get());
yield_with(&reader);
reader.done()
}
}
impl<T: AsRawFd + Write> Write for CoIo<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if !self.ctx_check()? {
return self.inner.write(buf);
}
self.io.reset();
match self.inner.write(buf) {
Ok(n) => return Ok(n),
Err(e) => {
let raw_err = e.raw_os_error();
if raw_err == Some(libc::EAGAIN) || raw_err == Some(libc::EWOULDBLOCK) {
} else {
return Err(e);
}
}
}
let mut writer = net_impl::SocketWrite::new(self, buf, self.write_timeout.get());
yield_with(&writer);
writer.done()
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
#[cfg(test)]
mod tests {
#![feature(test)]
use super::*;
#[allow(dead_code)]
fn compile_co_io() {
#[derive(Debug)]
struct Fd;
impl AsRawFd for Fd {
fn as_raw_fd(&self) -> RawFd {
0
}
}
impl Read for Fd {
fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
Ok(0)
}
}
let a = Fd;
let mut io = CoIo::new(a).unwrap();
let mut buf = [0u8; 100];
io.read(&mut buf).unwrap();
}
}