liburing/
lib.rs

1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4
5use std::mem::transmute;
6
7use libc::{c_int, c_uint, c_ushort, c_void, off_t};
8
9pub const LIBURING_UDATA_TIMEOUT: u64 = 0xFFFFFFFF_FFFFFFFF; // -1
10
11include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
12
13macro_rules! io_uring_barrier {
14    () => {
15        std::sync::atomic::compiler_fence(std::sync::atomic::Ordering::SeqCst);
16    };
17}
18
19macro_rules! io_uring_smp_store_release {
20    ($ptr:expr, $val:expr) => {
21        io_uring_barrier!();
22        std::ptr::write_volatile($ptr, $val);
23    };
24}
25
26macro_rules! io_uring_smp_load_acquire {
27    ($ptr:expr, $res:expr) => {
28        $res = std::ptr::read_volatile($ptr);
29        io_uring_barrier!();
30    };
31}
32
33/*
34 * Must be called after io_uring_for_each_cqe()
35 */
36pub unsafe fn io_uring_cq_advance(ring: *mut io_uring, nr: u32) {
37    if nr > 0 {
38        let cq: *mut io_uring_cq = &mut (*ring).cq;
39
40        /*
41         * Ensure that the kernel only sees the new value of the head
42         * index after the CQEs have been read.
43         */
44        io_uring_smp_store_release!((*cq).khead, *(*cq).khead + nr);
45    }
46}
47
48/*
49 * Must be called after io_uring_{peek,wait}_cqe() after the cqe has
50 * been processed by the application.
51 */
52pub unsafe fn io_uring_cqe_seen(ring: *mut io_uring, cqe: *mut io_uring_cqe) {
53    if !cqe.is_null() {
54        io_uring_cq_advance(ring, 1);
55    }
56}
57
58/*
59struct msghdr {
60    void         *msg_name;       /* optional address */
61    socklen_t     msg_namelen;    /* size of address */
62    struct iovec *msg_iov;        /* scatter/gather array */
63    size_t        msg_iovlen;     /* # elements in msg_iov */
64    void         *msg_control;    /* ancillary data, see below */
65    size_t        msg_controllen; /* ancillary data buffer len */
66    int           msg_flags;      /* flags on received message */
67};
68 */
69
70/*
71 * Command prep helpers
72 */
73pub unsafe fn io_uring_sqe_set_data(sqe: *mut io_uring_sqe, data: *mut c_void) {
74    (*sqe).user_data = data as u64;
75}
76
77pub unsafe fn io_uring_cqe_get_data(cqe: *mut io_uring_cqe) -> *mut c_void {
78    return (*cqe).user_data as *mut c_void;
79}
80
81pub unsafe fn io_uring_sqe_set_flags(sqe: *mut io_uring_sqe, flags: c_uint) {
82    (*sqe).flags = flags as u8;
83}
84
85pub unsafe fn io_uring_prep_rw(
86    op: c_uint,
87    sqe: *mut io_uring_sqe,
88    fd: c_int,
89    addr: *const c_void,
90    len: c_uint,
91    offset: off_t,
92) {
93    (*sqe).opcode = op as u8;
94    (*sqe).flags = 0;
95    (*sqe).ioprio = 0;
96    (*sqe).fd = fd;
97    (*sqe).off = transmute(offset);
98    (*sqe).addr = transmute(addr);
99    (*sqe).len = len;
100    (*sqe).__bindgen_anon_1.rw_flags = 0;
101    (*sqe).user_data = 0;
102    (*sqe).__bindgen_anon_2.__pad2[0] = 0;
103    (*sqe).__bindgen_anon_2.__pad2[1] = 0;
104    (*sqe).__bindgen_anon_2.__pad2[2] = 0;
105}
106
107pub unsafe fn io_uring_prep_readv(
108    sqe: *mut io_uring_sqe,
109    fd: c_int,
110    iovecs: *const libc::iovec,
111    nr_vecs: u32,
112    offset: off_t,
113) {
114    io_uring_prep_rw(
115        IORING_OP_READV,
116        sqe,
117        fd,
118        transmute(iovecs),
119        nr_vecs,
120        offset,
121    );
122}
123
124pub unsafe fn io_uring_prep_read_fixed(
125    sqe: *mut io_uring_sqe,
126    fd: i32,
127    buf: *mut c_void,
128    nbytes: u32,
129    offset: off_t,
130    buf_index: c_ushort,
131) {
132    io_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset);
133    (*sqe).__bindgen_anon_2.buf_index = buf_index;
134}
135
136pub unsafe fn io_uring_prep_writev(
137    sqe: *mut io_uring_sqe,
138    fd: i32,
139    iovecs: *const libc::iovec,
140    nr_vecs: u32,
141    offset: off_t,
142) {
143    io_uring_prep_rw(
144        IORING_OP_WRITEV,
145        sqe,
146        fd,
147        transmute(iovecs),
148        nr_vecs,
149        offset,
150    );
151}
152
153pub unsafe fn io_uring_prep_write_fixed(
154    sqe: *mut io_uring_sqe,
155    fd: i32,
156    buf: *mut c_void,
157    nbytes: u32,
158    offset: off_t,
159    buf_index: u16,
160) {
161    io_uring_prep_rw(IORING_OP_WRITE_FIXED, sqe, fd, buf, nbytes, offset);
162    (*sqe).__bindgen_anon_2.buf_index = buf_index;
163}
164
165pub unsafe fn io_uring_prep_recvmsg(
166    sqe: *mut io_uring_sqe,
167    fd: i32,
168    msg: *mut c_void,
169    flags: u32,
170) {
171    io_uring_prep_rw(IORING_OP_RECVMSG, sqe, fd, msg, 1, 0);
172    (*sqe).__bindgen_anon_1.msg_flags = flags;
173}
174
175pub unsafe fn io_uring_prep_sendmsg(
176    sqe: *mut io_uring_sqe,
177    fd: i32,
178    msg: *const c_void,
179    flags: u32,
180) {
181    io_uring_prep_rw(IORING_OP_SENDMSG, sqe, fd, msg, 1, 0);
182    (*sqe).__bindgen_anon_1.msg_flags = flags;
183}
184
185pub unsafe fn io_uring_prep_poll_add(
186    sqe: *mut io_uring_sqe,
187    fd: i32,
188    poll_mask: u16,
189) {
190    io_uring_prep_rw(IORING_OP_POLL_ADD, sqe, fd, std::ptr::null(), 0, 0);
191    (*sqe).__bindgen_anon_1.poll_events = poll_mask;
192}
193
194pub unsafe fn io_uring_prep_poll_remove(
195    sqe: *mut io_uring_sqe,
196    user_data: *mut c_void,
197) {
198    io_uring_prep_rw(IORING_OP_POLL_REMOVE, sqe, 0, user_data, 0, 0);
199}
200
201pub unsafe fn io_uring_prep_fsync(
202    sqe: *mut io_uring_sqe,
203    fd: i32,
204    fsync_flags: u32,
205) {
206    io_uring_prep_rw(IORING_OP_FSYNC, sqe, fd, std::ptr::null(), 0, 0);
207    (*sqe).__bindgen_anon_1.fsync_flags = fsync_flags;
208}
209
210pub unsafe fn io_uring_prep_nop(sqe: *mut io_uring_sqe) {
211    io_uring_prep_rw(IORING_OP_NOP, sqe, 0, std::ptr::null(), 0, 0);
212}
213
214pub unsafe fn io_uring_prep_timeout(
215    sqe: *mut io_uring_sqe,
216    ts: *mut __kernel_timespec,
217    count: off_t,
218) {
219    io_uring_prep_rw(IORING_OP_TIMEOUT, sqe, 0, transmute(ts), 1, count);
220}
221
222pub unsafe fn io_uring_sq_space_left(ring: *const io_uring) -> u32 {
223    return (*ring).sq.kring_entries as u32
224        - ((*ring).sq.sqe_tail - (*ring).sq.sqe_head);
225}
226
227pub unsafe fn io_uring_cq_ready(ring: *mut io_uring) -> u32 {
228    let tail: c_uint;
229    io_uring_smp_load_acquire!((*ring).cq.ktail, tail);
230    return tail - *(*ring).cq.khead;
231}
232
233unsafe fn __io_uring_peek_cqe(
234    ring: *mut io_uring,
235    cqe_ptr: *mut *mut io_uring_cqe,
236) -> c_int {
237    let mut cqe: *mut io_uring_cqe;
238    let mut err: c_int = 0;
239
240    loop {
241        /*
242         * io_uring_smp_load_acquire() enforces the order of tail
243         * and CQE reads.
244         */
245        let head = *(*ring).cq.khead;
246        let tail: c_uint;
247        io_uring_smp_load_acquire!((*ring).cq.ktail, tail);
248        cqe = if head != tail {
249            (*ring).cq.cqes.offset((head & *(*ring).cq.kring_mask) as isize)
250        } else {
251            std::ptr::null_mut()
252        };
253
254        if !cqe.is_null() {
255            if (*cqe).user_data == LIBURING_UDATA_TIMEOUT {
256                if (*cqe).res < 0 {
257                    err = (*cqe).res;
258                }
259
260                io_uring_cq_advance(ring, 1);
261
262                if err == 0 {
263                    continue;
264                }
265
266                cqe = std::ptr::null_mut();
267            }
268        }
269
270        break;
271    }
272
273    *cqe_ptr = cqe;
274
275    return err;
276}
277
278/*
279 * Return an IO completion, if one is readily available. Returns 0 with
280 * cqe_ptr filled in on success, -errno on failure.
281 */
282pub unsafe fn io_uring_peek_cqe(
283    ring: *mut io_uring,
284    cqe_ptr: *mut *mut io_uring_cqe,
285) -> c_int {
286    let err: i32 = __io_uring_peek_cqe(ring, cqe_ptr);
287    if err != 0 {
288        return err;
289    }
290
291    return __io_uring_get_cqe(ring, cqe_ptr, 0, 0, std::ptr::null_mut());
292}
293
294/*
295 * Return an IO completion, waiting for it if necessary. Returns 0 with
296 * cqe_ptr filled in on success, -errno on failure.
297 */
298pub unsafe fn io_uring_wait_cqe(
299    ring: *mut io_uring,
300    cqe_ptr: *mut *mut io_uring_cqe,
301) -> c_int {
302    let err = __io_uring_peek_cqe(ring, cqe_ptr);
303    if err != 0 {
304        return err;
305    }
306
307    return __io_uring_get_cqe(ring, cqe_ptr, 0, 1, std::ptr::null_mut());
308}
309
310#[cfg(test)]
311mod tests {
312    use std::io::Error;
313    use std::mem;
314
315    use crate::*;
316
317    const QUEUE_DEPTH: u32 = 4;
318
319    #[test]
320    fn test_io_uring_queue_init() {
321        let mut ring = unsafe {
322            let mut s = mem::MaybeUninit::<io_uring>::uninit();
323            let ret = io_uring_queue_init(QUEUE_DEPTH, s.as_mut_ptr(), 0);
324            if ret < 0 {
325                panic!(
326                    "io_uring_queue_init: {:?}",
327                    Error::from_raw_os_error(ret)
328                );
329            }
330            s.assume_init()
331        };
332
333        loop {
334            let sqe = unsafe { io_uring_get_sqe(&mut ring) };
335            if sqe == std::ptr::null_mut() {
336                break;
337            }
338            unsafe { io_uring_prep_nop(sqe) };
339        }
340        let ret = unsafe { io_uring_submit(&mut ring) };
341        if ret < 0 {
342            panic!("io_uring_submit: {:?}", Error::from_raw_os_error(ret));
343        }
344
345        let mut cqe: *mut io_uring_cqe = unsafe { std::mem::zeroed() };
346        // let mut done = 0;
347        let pending = ret;
348        for _ in 0..pending {
349            let ret = unsafe { io_uring_wait_cqe(&mut ring, &mut cqe) };
350            if ret < 0 {
351                panic!(
352                    "io_uring_wait_cqe: {:?}",
353                    Error::from_raw_os_error(ret)
354                );
355            }
356            // done += 1;
357            if unsafe { (*cqe).res } < 0 {
358                eprintln!("(*cqe).res = {}", unsafe { (*cqe).res });
359            }
360            unsafe { io_uring_cqe_seen(&mut ring, cqe) };
361        }
362
363        // println!("Submitted={}, completed={}", pending, done);
364        unsafe { io_uring_queue_exit(&mut ring) };
365    }
366}