use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::convert::TryFrom;
use std::ffi::c_void;
use std::io::{self, Read, Write};
use std::mem::forget;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
use std::rc::Rc;
use std::time::Duration;
use core::ptr::null_mut;
use crate::error::{Error, TarantoolError};
use crate::ffi::tarantool as ffi;
use crate::fiber::{unpack_callback, Cond};
const TIMEOUT_INFINITY: f64 = 365.0 * 86400.0 * 100.0;
pub struct CoIOStream {
fd: RawFd,
}
impl CoIOStream {
pub fn new<T>(inner: T) -> Result<CoIOStream, io::Error>
where
T: IntoRawFd,
{
let fd = inner.into_raw_fd();
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) };
if flags < 0 {
return Err(io::Error::last_os_error());
}
if unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 } {
Err(io::Error::last_os_error())
} else {
Ok(CoIOStream { fd })
}
}
pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<CoIOStream, io::Error> {
let inner_stream = TcpStream::connect(addr)?;
inner_stream.set_nonblocking(true)?;
Ok(CoIOStream {
fd: inner_stream.into_raw_fd(),
})
}
pub fn connect_timeout(addr: &SocketAddr, timeout: Duration) -> Result<CoIOStream, io::Error> {
let inner_stream = TcpStream::connect_timeout(addr, timeout)?;
inner_stream.set_nonblocking(true)?;
Ok(CoIOStream {
fd: inner_stream.into_raw_fd(),
})
}
pub fn read_with_timeout(
&mut self,
buf: &mut [u8],
timeout: Option<Duration>,
) -> Result<usize, io::Error> {
read(self.fd, buf, timeout)
}
pub fn write_with_timeout(
&mut self,
buf: &[u8],
timeout: Option<Duration>,
) -> Result<usize, io::Error> {
write(self.fd, buf, timeout)
}
}
impl IntoRawFd for CoIOStream {
fn into_raw_fd(self) -> RawFd {
let fd = self.fd;
forget(self);
fd
}
}
impl AsRawFd for CoIOStream {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}
impl Read for CoIOStream {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
self.read_with_timeout(buf, None)
}
}
impl Write for CoIOStream {
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
self.write_with_timeout(buf, None)
}
fn flush(&mut self) -> Result<(), io::Error> {
Ok(())
}
}
impl Drop for CoIOStream {
fn drop(&mut self) {
unsafe { ffi::coio_close(self.fd) };
}
}
pub struct CoIOListener {
inner: TcpListener,
}
impl CoIOListener {
pub fn accept(&self) -> Result<CoIOStream, io::Error> {
loop {
let res = self.inner.accept();
return match res {
Ok((stream, _)) => CoIOStream::new(stream),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
coio_wait(
self.inner.as_raw_fd(),
ffi::CoIOFlags::READ,
TIMEOUT_INFINITY,
)?;
continue;
}
Err(e)
}
};
}
}
pub fn inner_listener(&mut self) -> &mut TcpListener {
&mut self.inner
}
}
impl TryFrom<TcpListener> for CoIOListener {
type Error = io::Error;
fn try_from(value: TcpListener) -> Result<Self, Self::Error> {
value.set_nonblocking(true)?;
Ok(Self { inner: value })
}
}
pub fn coio_wait(fd: RawFd, flags: ffi::CoIOFlags, timeout: f64) -> Result<(), io::Error> {
match unsafe { ffi::coio_wait(fd, flags.bits(), timeout) } {
0 => Err(io::ErrorKind::TimedOut.into()),
_ => Ok(()),
}
}
pub fn coio_call<F, T>(callback: &mut F, arg: T) -> isize
where
F: FnMut(Box<T>) -> i32,
{
let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
unsafe { ffi::coio_call(trampoline, callback_ptr, Box::into_raw(Box::<T>::new(arg))) }
}
pub unsafe fn getaddrinfo(
host: &std::ffi::CStr,
port: Option<&std::ffi::CStr>,
hints: &libc::addrinfo,
timeout: f64,
) -> Result<*mut libc::addrinfo, Error> {
let mut result = null_mut();
let port = if let Some(p) = port {
p.as_ptr()
} else {
null_mut()
};
if unsafe { ffi::coio_getaddrinfo(host.as_ptr(), port, hints, &mut result, timeout) } < 0 {
Err(TarantoolError::last().into())
} else {
Ok(result)
}
}
#[inline(always)]
pub(crate) fn read(
fd: RawFd,
buf: &mut [u8],
timeout: Option<Duration>,
) -> Result<usize, io::Error> {
let buf_len = buf.len();
let result = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut c_void, buf_len) };
if result >= 0 {
return Ok(result as usize);
}
let err = io::Error::last_os_error();
if err.kind() != io::ErrorKind::WouldBlock {
return Err(err);
}
let timeout = match timeout {
None => TIMEOUT_INFINITY,
Some(timeout) => timeout.as_secs_f64(),
};
coio_wait(fd, ffi::CoIOFlags::READ, timeout)?;
let result = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut c_void, buf_len) };
if result < 0 {
Err(io::Error::last_os_error())
} else {
Ok(result as usize)
}
}
#[inline(always)]
pub(crate) fn write(fd: RawFd, buf: &[u8], timeout: Option<Duration>) -> Result<usize, io::Error> {
let result = unsafe { libc::write(fd, buf.as_ptr() as *mut c_void, buf.len()) };
if result >= 0 {
return Ok(result as usize);
}
let err = io::Error::last_os_error();
if err.kind() != io::ErrorKind::WouldBlock {
return Err(err);
}
let timeout = match timeout {
None => TIMEOUT_INFINITY,
Some(timeout) => timeout.as_secs_f64(),
};
coio_wait(fd, ffi::CoIOFlags::WRITE, timeout)?;
let result = unsafe { libc::write(fd, buf.as_ptr() as *mut c_void, buf.len()) };
if result < 0 {
Err(io::Error::last_os_error())
} else {
Ok(result as usize)
}
}
pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let chan = Rc::new(Chan {
buffer: RefCell::new(VecDeque::with_capacity(capacity)),
cond: Cond::new(),
tx_count: Cell::new(1),
rx_is_active: Cell::new(true),
});
(Sender(chan.clone()), Receiver(chan))
}
pub struct Sender<T>(Rc<Chan<T>>);
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<(), io::Error> {
if !self.0.rx_is_active.get() {
return Err(io::ErrorKind::NotConnected.into());
}
let was_empty = {
let mut buffer = self.0.buffer.borrow_mut();
let was_empty = buffer.len() == 0;
buffer.push_back(value);
was_empty
};
if was_empty {
self.0.cond.signal();
}
Ok(())
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.0.tx_count.set(self.0.tx_count.get() + 1);
Sender(self.0.clone())
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.0.tx_count.set(self.0.tx_count.get() - 1);
self.0.cond.signal();
}
}
pub struct Receiver<T>(Rc<Chan<T>>);
impl<T> Receiver<T> {
pub fn recv(&self) -> Option<T> {
if self.0.buffer.borrow().len() == 0 {
if self.0.tx_count.get() == 0 {
return None;
}
self.0.cond.wait();
}
self.0.buffer.borrow_mut().pop_front()
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.0.rx_is_active.set(false);
}
}
struct Chan<T> {
buffer: RefCell<VecDeque<T>>,
cond: Cond,
tx_count: Cell<usize>,
rx_is_active: Cell<bool>,
}