1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
use std::io;
use std::mem;
use std::os::unix::io::RawFd;
use std::ptr::{self, NonNull};
use std::marker::PhantomData;
use std::time::Duration;

use super::{IoUring, sys};

const IORING_OP_NOP:            u8  = 0;
const IORING_OP_READV:          u8  = 1;
const IORING_OP_WRITEV:         u8  = 2;
const IORING_OP_FSYNC:          u8  = 3;
const IORING_OP_READ_FIXED:     u8  = 4;
const IORING_OP_WRITE_FIXED:    u8  = 5;
const IORING_OP_TIMEOUT:        u8  = 11;

pub struct SubmissionQueue<'ring> {
    ring: NonNull<sys::io_uring>,
    _marker: PhantomData<&'ring mut IoUring>,
}

impl<'ring> SubmissionQueue<'ring> {
    pub(crate) fn new(ring: &'ring IoUring) -> SubmissionQueue<'ring> {
        SubmissionQueue {
            ring: NonNull::from(&ring.ring),
            _marker: PhantomData,
        }
    }

    pub fn next_sqe<'a>(&'a mut self) -> Option<SubmissionQueueEvent<'a>> {
        unsafe {
            let sqe = sys::io_uring_get_sqe(self.ring.as_ptr());
            if sqe != ptr::null_mut() {
                let mut sqe = SubmissionQueueEvent::new(&mut *sqe);
                sqe.clear();
                Some(sqe)
            } else {
                None
            }
        }
    }

    pub fn submit(&mut self) -> io::Result<usize> {
        resultify!(unsafe { sys::io_uring_submit(self.ring.as_ptr()) })
    }

    pub fn submit_and_wait(&mut self, wait_for: u32) -> io::Result<usize> {
        resultify!(unsafe { sys::io_uring_submit_and_wait(self.ring.as_ptr(), wait_for as _) })
    }

    pub fn submit_and_wait_with_timeout(&mut self, wait_for: u32, duration: Duration)
        -> io::Result<usize>
    {
        let ts = sys::__kernel_timespec {
            tv_sec: duration.as_secs() as _,
            tv_nsec: duration.subsec_nanos() as _
        };

        loop {
            if let Some(mut sqe) = self.next_sqe() {
                sqe.clear();
                unsafe {
                    sqe.prep_timeout(&ts);
                    return resultify!(sys::io_uring_submit_and_wait(self.ring.as_ptr(), wait_for as _))
                }
            }

            self.submit()?;
        }
    }
}

unsafe impl<'ring> Send for SubmissionQueue<'ring> { }
unsafe impl<'ring> Sync for SubmissionQueue<'ring> { }

pub struct SubmissionQueueEvent<'a> {
    sqe: &'a mut sys::io_uring_sqe,
}

impl<'a> SubmissionQueueEvent<'a> {
    pub(crate) fn new(sqe: &'a mut sys::io_uring_sqe) -> SubmissionQueueEvent<'a> {
        SubmissionQueueEvent { sqe }
    }

    pub fn user_data(&self) -> u64 {
        self.sqe.user_data as u64
    }

    pub fn set_user_data(&mut self, user_data: u64) {
        self.sqe.user_data = user_data as _;
    }

    pub fn flags(&self) -> SubmissionFlags {
        unsafe { SubmissionFlags::from_bits_unchecked(self.sqe.flags as _) }
    }

    pub fn set_flags(&mut self, flags: SubmissionFlags) {
        self.sqe.flags = flags.bits() as _;
    }

    #[inline]
    pub unsafe fn prep_read_vectored(
        &mut self,
        fd: RawFd,
        bufs: &mut [io::IoSliceMut<'_>],
        offset: usize,
    ) {
        let len = bufs.len();
        let addr = bufs as *mut [io::IoSliceMut<'_>] as *mut io::IoSliceMut<'_>;
        self.sqe.opcode = IORING_OP_READV as _;
        self.sqe.fd = fd;
        self.sqe.off_addr2.off = offset as _;
        self.sqe.addr = addr as _;
        self.sqe.len = len as _;
    }

    #[inline]
    pub unsafe fn prep_read_fixed(
        &mut self,
        fd: RawFd,
        buf: &mut [u8],
        offset: usize,
        buf_index: usize,
    ) {
        let len = buf.len();
        let addr = buf as *mut [u8] as *mut u8;
        self.sqe.opcode = IORING_OP_READ_FIXED as _;
        self.sqe.fd = fd;
        self.sqe.off_addr2.off = offset as _;
        self.sqe.addr = addr as _;
        self.sqe.len = len as _;
        self.sqe.buf_index.buf_index = buf_index as _;
        self.sqe.flags |= SubmissionFlags::FIXED_FILE.bits();
    }

    #[inline]
    pub unsafe fn prep_write_vectored(
        &mut self,
        fd: RawFd,
        bufs: &[io::IoSlice<'_>],
        offset: usize,
    ) {
        let len = bufs.len();
        let addr = bufs as *const [io::IoSlice<'_>] as *const io::IoSlice<'_>;
        self.sqe.opcode = IORING_OP_WRITEV as _;
        self.sqe.fd = fd;
        self.sqe.off_addr2.off = offset as _;
        self.sqe.addr = addr as _;
        self.sqe.len = len as _;
    }

    #[inline]
    pub unsafe fn prep_write_fixed(
        &mut self,
        fd: RawFd,
        buf: &[u8],
        offset: usize,
        buf_index: usize,
    ) {
        let len = buf.len();
        let addr = buf as *const [u8] as *const u8;
        self.sqe.opcode = IORING_OP_WRITE_FIXED as _;
        self.sqe.fd = fd;
        self.sqe.off_addr2.off = offset as _;
        self.sqe.addr = addr as _;
        self.sqe.len = len as _;
        self.sqe.buf_index.buf_index = buf_index as _;
        self.sqe.flags |= SubmissionFlags::FIXED_FILE.bits();
    }

    #[inline]
    pub unsafe fn prep_fsync(&mut self, fd: RawFd, flags: FsyncFlags) {
        self.sqe.opcode = IORING_OP_FSYNC as _;
        self.sqe.fd = fd;
        self.sqe.off_addr2.off = 0;
        self.sqe.addr = 0;
        self.sqe.len = 0;
        self.sqe.cmd_flags.fsync_flags = flags.bits() as _;
    }

    #[inline]
    pub unsafe fn prep_timeout(&mut self, ts: &sys::__kernel_timespec) {
        self.sqe.opcode = IORING_OP_TIMEOUT as _;
        self.sqe.fd = 0;
        self.sqe.addr = ts as *const sys::__kernel_timespec as _;
        self.sqe.len = 1;
        self.sqe.user_data = sys::LIBURING_UDATA_TIMEOUT;
    }

    #[inline]
    pub unsafe fn prep_nop(&mut self) {
        self.sqe.opcode = IORING_OP_NOP;
        self.sqe.fd = 0;
        self.sqe.off_addr2.off = 0;
        self.sqe.addr = 0;
        self.sqe.len = 0;
    }

    pub fn clear(&mut self) {
        *self.sqe = unsafe { mem::zeroed() };
    }

    pub fn raw(&self) -> &sys::io_uring_sqe {
        &self.sqe
    }

    pub fn raw_mut(&mut self) -> &mut sys::io_uring_sqe {
        &mut self.sqe
    }
}

unsafe impl<'a> Send for SubmissionQueueEvent<'a> { }
unsafe impl<'a> Sync for SubmissionQueueEvent<'a> { }

bitflags::bitflags! {
    pub struct SubmissionFlags: u8 {
        const FIXED_FILE    = 1 << 0;   /* use fixed fileset */
        const IO_DRAIN      = 1 << 1;   /* issue after inflight IO */
        const IO_LINK       = 1 << 2;   /* next IO depends on this one */
    }
}

bitflags::bitflags! {
    pub struct FsyncFlags: u32 {
        const FSYNC_DATASYNC    = 1 << 0;
    }
}