1use std::convert::TryFrom;
6use std::ffi::c_void;
7use std::io;
8use std::io::{Read, Write};
9use std::net::TcpListener;
10use std::os::raw::{c_char, c_int};
11use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
12
13use failure::_core::ptr::null_mut;
14
15use crate::error::{Error, TarantoolError};
16use crate::fiber::unpack_callback;
17
18const TIMEOUT_INFINITY: f64 = 365.0 * 86400.0 * 100.0;
19
20pub struct CoIOStream {
22 fd: RawFd,
23}
24
25bitflags! {
26 pub struct CoIOFlags: c_int {
28 const READ = 1;
29 const WRITE = 2;
30 }
31}
32
33impl CoIOStream {
34 pub fn new<T>(inner: T) -> Result<CoIOStream, io::Error>
35 where
36 T: IntoRawFd,
37 {
38 let fd = inner.into_raw_fd();
39 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) };
40 if flags < 0 {
41 return Err(io::Error::last_os_error());
42 }
43
44 if unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 } {
45 Err(io::Error::last_os_error())
46 } else {
47 Ok(CoIOStream { fd })
48 }
49 }
50
51 fn read_with_timeout(&mut self, buf: &mut [u8], timeout: f64) -> Result<usize, io::Error> {
53 let buf_len = buf.len();
54 let result = unsafe { libc::read(self.fd, buf.as_mut_ptr() as *mut c_void, buf_len) };
55 if result >= 0 {
56 return Ok(result as usize);
57 }
58
59 let err = io::Error::last_os_error();
60 if err.kind() != io::ErrorKind::WouldBlock {
61 return Err(err);
62 }
63
64 coio_wait(self.fd, CoIOFlags::READ, timeout)?;
65 let result = unsafe { libc::read(self.fd, buf.as_mut_ptr() as *mut c_void, buf_len) };
66 if result < 0 {
67 Err(io::Error::last_os_error())
68 } else {
69 Ok(result as usize)
70 }
71 }
72
73 fn write_with_timeout(&mut self, buf: &[u8], timeout: f64) -> Result<usize, io::Error> {
75 let result = unsafe { libc::write(self.fd, buf.as_ptr() as *mut c_void, buf.len()) };
76 if result >= 0 {
77 return Ok(result as usize);
78 }
79
80 let err = io::Error::last_os_error();
81 if err.kind() != io::ErrorKind::WouldBlock {
82 return Err(err);
83 }
84
85 coio_wait(self.fd, CoIOFlags::WRITE, timeout)?;
86 let result = unsafe { libc::write(self.fd, buf.as_ptr() as *mut c_void, buf.len()) };
87 if result < 0 {
88 Err(io::Error::last_os_error())
89 } else {
90 Ok(result as usize)
91 }
92 }
93}
94
95impl IntoRawFd for CoIOStream {
96 fn into_raw_fd(self) -> RawFd {
97 self.fd
98 }
99}
100
101impl Read for CoIOStream {
102 fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
103 self.read_with_timeout(buf, TIMEOUT_INFINITY)
104 }
105}
106
107impl Write for CoIOStream {
108 fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
109 self.write_with_timeout(buf, TIMEOUT_INFINITY)
110 }
111
112 fn flush(&mut self) -> Result<(), io::Error> {
113 Ok(())
114 }
115}
116
117impl Drop for CoIOStream {
118 fn drop(&mut self) {
119 unsafe { ffi::coio_close(self.fd) };
120 }
121}
122
123pub struct CoIOListener {
125 inner: TcpListener,
126}
127
128impl CoIOListener {
129 pub fn accept(&self) -> Result<CoIOStream, io::Error> {
131 loop {
132 let res = self.inner.accept();
133 return match res {
134 Ok((stream, _)) => CoIOStream::new(stream),
135
136 Err(e) => {
137 if e.kind() == io::ErrorKind::WouldBlock {
138 coio_wait(self.inner.as_raw_fd(), CoIOFlags::READ, TIMEOUT_INFINITY)?;
139 continue;
140 }
141 Err(e)
142 }
143 };
144 }
145 }
146
147 pub fn inner_listener(&mut self) -> &mut TcpListener {
148 &mut self.inner
149 }
150}
151
152impl TryFrom<TcpListener> for CoIOListener {
153 type Error = io::Error;
154
155 fn try_from(value: TcpListener) -> Result<Self, Self::Error> {
156 value.set_nonblocking(true)?;
157 Ok(Self { inner: value })
158 }
159}
160
161pub fn coio_wait(fd: RawFd, flags: CoIOFlags, timeout: f64) -> Result<(), io::Error> {
167 match unsafe { ffi::coio_wait(fd, flags.bits, timeout) } {
168 0 => Err(io::ErrorKind::TimedOut.into()),
169 _ => Ok(()),
170 }
171}
172
173pub fn coio_call<F, T>(callback: &mut F, arg: T) -> isize
196where
197 F: FnMut(Box<T>) -> i32,
198{
199 let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
200 unsafe { ffi::coio_call(trampoline, callback_ptr, Box::into_raw(Box::<T>::new(arg))) }
201}
202
203pub fn getaddrinfo(
210 host: &str,
211 port: &str,
212 hints: &libc::addrinfo,
213 timeout: f64,
214) -> Result<libc::addrinfo, Error> {
215 let mut result: *mut libc::addrinfo = null_mut();
216 if unsafe {
217 ffi::coio_getaddrinfo(
218 host.as_ptr() as *const c_char,
219 port.as_ptr() as *const c_char,
220 &*hints,
221 &mut result,
222 timeout,
223 )
224 } < 0
225 {
226 Err(TarantoolError::last().into())
227 } else {
228 Ok(unsafe { result.read() })
229 }
230}
231
232mod ffi {
233 use std::os::raw::{c_char, c_int};
234
235 use va_list::VaList;
236
237 extern "C" {
238 pub fn coio_wait(fd: c_int, event: c_int, timeout: f64) -> c_int;
239 pub fn coio_close(fd: c_int) -> c_int;
240 pub fn coio_getaddrinfo(
241 host: *const c_char,
242 port: *const c_char,
243 hints: *const libc::addrinfo,
244 res: *mut *mut libc::addrinfo,
245 timeout: f64,
246 ) -> c_int;
247 pub fn coio_call(func: Option<unsafe extern "C" fn(VaList) -> c_int>, ...) -> isize;
248 }
249}