tarantool_module/
coio.rs

1//! Cooperative input/output
2//!
3//! See also:
4//! - [C API reference: Module coio](https://www.tarantool.io/en/doc/latest/dev_guide/reference_capi/coio/)
5use std::convert::TryFrom;
6use std::ffi::c_void;
7use std::io;
8use std::io::{Read, Write};
9use std::net::TcpListener;
10use std::os::raw::{c_char, c_int};
11use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
12
13use failure::_core::ptr::null_mut;
14
15use crate::error::{Error, TarantoolError};
16use crate::fiber::unpack_callback;
17
18const TIMEOUT_INFINITY: f64 = 365.0 * 86400.0 * 100.0;
19
20/// Uses CoIO main loop to poll read/write events from wrapped socket
21pub struct CoIOStream {
22    fd: RawFd,
23}
24
25bitflags! {
26    /// Event type(s) to wait. Can be `READ` or/and `WRITE`
27    pub struct CoIOFlags: c_int {
28        const READ = 1;
29        const WRITE = 2;
30    }
31}
32
33impl CoIOStream {
34    pub fn new<T>(inner: T) -> Result<CoIOStream, io::Error>
35    where
36        T: IntoRawFd,
37    {
38        let fd = inner.into_raw_fd();
39        let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) };
40        if flags < 0 {
41            return Err(io::Error::last_os_error());
42        }
43
44        if unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 } {
45            Err(io::Error::last_os_error())
46        } else {
47            Ok(CoIOStream { fd })
48        }
49    }
50
51    /// Pull some bytes from this source into the specified buffer. Returns how many bytes were read or 0 on timeout.
52    fn read_with_timeout(&mut self, buf: &mut [u8], timeout: f64) -> Result<usize, io::Error> {
53        let buf_len = buf.len();
54        let result = unsafe { libc::read(self.fd, buf.as_mut_ptr() as *mut c_void, buf_len) };
55        if result >= 0 {
56            return Ok(result as usize);
57        }
58
59        let err = io::Error::last_os_error();
60        if err.kind() != io::ErrorKind::WouldBlock {
61            return Err(err);
62        }
63
64        coio_wait(self.fd, CoIOFlags::READ, timeout)?;
65        let result = unsafe { libc::read(self.fd, buf.as_mut_ptr() as *mut c_void, buf_len) };
66        if result < 0 {
67            Err(io::Error::last_os_error())
68        } else {
69            Ok(result as usize)
70        }
71    }
72
73    /// Write a buffer into this writer. Returning how many bytes were written or 0 on timeout.
74    fn write_with_timeout(&mut self, buf: &[u8], timeout: f64) -> Result<usize, io::Error> {
75        let result = unsafe { libc::write(self.fd, buf.as_ptr() as *mut c_void, buf.len()) };
76        if result >= 0 {
77            return Ok(result as usize);
78        }
79
80        let err = io::Error::last_os_error();
81        if err.kind() != io::ErrorKind::WouldBlock {
82            return Err(err);
83        }
84
85        coio_wait(self.fd, CoIOFlags::WRITE, timeout)?;
86        let result = unsafe { libc::write(self.fd, buf.as_ptr() as *mut c_void, buf.len()) };
87        if result < 0 {
88            Err(io::Error::last_os_error())
89        } else {
90            Ok(result as usize)
91        }
92    }
93}
94
95impl IntoRawFd for CoIOStream {
96    fn into_raw_fd(self) -> RawFd {
97        self.fd
98    }
99}
100
101impl Read for CoIOStream {
102    fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
103        self.read_with_timeout(buf, TIMEOUT_INFINITY)
104    }
105}
106
107impl Write for CoIOStream {
108    fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
109        self.write_with_timeout(buf, TIMEOUT_INFINITY)
110    }
111
112    fn flush(&mut self) -> Result<(), io::Error> {
113        Ok(())
114    }
115}
116
117impl Drop for CoIOStream {
118    fn drop(&mut self) {
119        unsafe { ffi::coio_close(self.fd) };
120    }
121}
122
123/// Uses CoIO main loop to poll incoming connections from wrapped socket listener
124pub struct CoIOListener {
125    inner: TcpListener,
126}
127
128impl CoIOListener {
129    /// Accept a new incoming connection from this listener.
130    pub fn accept(&self) -> Result<CoIOStream, io::Error> {
131        loop {
132            let res = self.inner.accept();
133            return match res {
134                Ok((stream, _)) => CoIOStream::new(stream),
135
136                Err(e) => {
137                    if e.kind() == io::ErrorKind::WouldBlock {
138                        coio_wait(self.inner.as_raw_fd(), CoIOFlags::READ, TIMEOUT_INFINITY)?;
139                        continue;
140                    }
141                    Err(e)
142                }
143            };
144        }
145    }
146
147    pub fn inner_listener(&mut self) -> &mut TcpListener {
148        &mut self.inner
149    }
150}
151
152impl TryFrom<TcpListener> for CoIOListener {
153    type Error = io::Error;
154
155    fn try_from(value: TcpListener) -> Result<Self, Self::Error> {
156        value.set_nonblocking(true)?;
157        Ok(Self { inner: value })
158    }
159}
160
161/// Wait until `READ` or `WRITE` event on socket (`fd`). Yields.
162///
163/// - `fd` - non-blocking socket file description
164/// - `events` - requested events to wait. Combination of [CoIOFlags::READ | CoIOFlags::WRITE](struct.CoIOFlags.html) bit flags.
165/// - `timeoout` - timeout in seconds.
166pub fn coio_wait(fd: RawFd, flags: CoIOFlags, timeout: f64) -> Result<(), io::Error> {
167    match unsafe { ffi::coio_wait(fd, flags.bits, timeout) } {
168        0 => Err(io::ErrorKind::TimedOut.into()),
169        _ => Ok(()),
170    }
171}
172
173/// Create new eio task with specified function and
174/// arguments. Yield and wait until the task is complete
175/// or a timeout occurs.
176///
177/// This function doesn't throw exceptions to avoid double error
178/// checking: in most cases it's also necessary to check the return
179/// value of the called function and perform necessary actions. If
180/// func sets errno, the errno is preserved across the call.
181///
182/// Returns:
183/// - `-1` and `errno = ENOMEM` if failed to create a task
184/// - the function return (errno is preserved).
185///
186/// ```
187/// struct FuncArgs {}
188///
189/// fn func(args: FuncArgs) -> i32 {}
190///
191/// if coio_call(func, FuncArgs{}) == -1 {
192///		// handle errors.
193/// }
194/// ```
195pub fn coio_call<F, T>(callback: &mut F, arg: T) -> isize
196where
197    F: FnMut(Box<T>) -> i32,
198{
199    let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
200    unsafe { ffi::coio_call(trampoline, callback_ptr, Box::into_raw(Box::<T>::new(arg))) }
201}
202
203/// Fiber-friendly version of `getaddrinfo(3)`.
204///
205/// - `host` - host name, i.e. "tarantool.org"
206/// - `port` - service name, i.e. "80" or "http"
207/// - `hints` - hints, see `getaddrinfo(3)`
208/// - `timeout` - timeout
209pub fn getaddrinfo(
210    host: &str,
211    port: &str,
212    hints: &libc::addrinfo,
213    timeout: f64,
214) -> Result<libc::addrinfo, Error> {
215    let mut result: *mut libc::addrinfo = null_mut();
216    if unsafe {
217        ffi::coio_getaddrinfo(
218            host.as_ptr() as *const c_char,
219            port.as_ptr() as *const c_char,
220            &*hints,
221            &mut result,
222            timeout,
223        )
224    } < 0
225    {
226        Err(TarantoolError::last().into())
227    } else {
228        Ok(unsafe { result.read() })
229    }
230}
231
232mod ffi {
233    use std::os::raw::{c_char, c_int};
234
235    use va_list::VaList;
236
237    extern "C" {
238        pub fn coio_wait(fd: c_int, event: c_int, timeout: f64) -> c_int;
239        pub fn coio_close(fd: c_int) -> c_int;
240        pub fn coio_getaddrinfo(
241            host: *const c_char,
242            port: *const c_char,
243            hints: *const libc::addrinfo,
244            res: *mut *mut libc::addrinfo,
245            timeout: f64,
246        ) -> c_int;
247        pub fn coio_call(func: Option<unsafe extern "C" fn(VaList) -> c_int>, ...) -> isize;
248    }
249}