1use core::ffi::{c_int, c_long, c_void};
6use core::ptr;
7use core::sync::atomic::{AtomicU32, Ordering};
8use std::io;
9
10use crate::completion::Completion;
11use crate::ffi::{
12 self, IORING_ENTER_GETEVENTS, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING, IORING_OFF_SQES,
13 IORING_OP_ACCEPT, IORING_OP_NOP, IORING_OP_READ, IORING_OP_RECV, IORING_OP_WRITE,
14 IORING_RECV_MULTISHOT, IOSQE_BUFFER_SELECT, MAP_POPULATE, MAP_SHARED, PROT_READ, PROT_WRITE,
15 SOCK_CLOEXEC, SOCK_NONBLOCK, SYS_IO_URING_ENTER, SYS_IO_URING_SETUP,
16};
17use crate::layout::{IoUringParams, IoUringSqe};
18use crate::pbr::ProvidedBufRing;
19
20pub struct IoUring {
22 ring_fd: c_int,
23 sq_mmap: *mut c_void,
24 sq_mmap_len: usize,
25 cq_mmap: *mut c_void,
26 cq_mmap_len: usize,
27 sqes: *mut IoUringSqe,
28 sqes_len: usize,
29 sq_entries: u32,
30 sq_mask: u32,
31 sq_tail: u32,
33 sq_khead: *const AtomicU32,
34 sq_ktail: *const AtomicU32,
35 sq_array: *mut u32,
36 cq_mask: u32,
37 cq_khead: *const AtomicU32,
38 cq_ktail: *const AtomicU32,
39 cqes: *const Completion,
40}
41
42unsafe impl Send for IoUring {}
46
47struct SqCursors {
49 khead: *const AtomicU32,
50 ktail: *const AtomicU32,
51 array: *mut u32,
52 mask: u32,
53 tail: u32,
54}
55
56struct CqCursors {
58 khead: *const AtomicU32,
59 ktail: *const AtomicU32,
60 cqes: *const Completion,
61 mask: u32,
62}
63
64impl IoUring {
65 pub fn new(entries: u32) -> io::Result<IoUring> {
67 let (ring_fd, p) = Self::setup_ring(entries)?;
68 let (sq_len, cq_len, sqes_len) = Self::region_sizes(&p);
69 let (sq_mmap, cq_mmap, sqes_map) =
70 Self::map_three_regions(ring_fd, sq_len, cq_len, sqes_len)?;
71
72 let sq = unsafe { Self::sq_cursors(sq_mmap, &p) };
75 let cq = unsafe { Self::cq_cursors(cq_mmap, &p) };
76
77 Ok(IoUring {
78 ring_fd,
79 sq_mmap,
80 sq_mmap_len: sq_len,
81 cq_mmap,
82 cq_mmap_len: cq_len,
83 sqes: sqes_map as *mut IoUringSqe,
84 sqes_len,
85 sq_entries: p.sq_entries,
86 sq_mask: sq.mask,
87 sq_tail: sq.tail,
88 sq_khead: sq.khead,
89 sq_ktail: sq.ktail,
90 sq_array: sq.array,
91 cq_mask: cq.mask,
92 cq_khead: cq.khead,
93 cq_ktail: cq.ktail,
94 cqes: cq.cqes,
95 })
96 }
97
98 fn map_three_regions(
102 ring_fd: c_int,
103 sq_len: usize,
104 cq_len: usize,
105 sqes_len: usize,
106 ) -> io::Result<(*mut c_void, *mut c_void, *mut c_void)> {
107 let sq_mmap = Self::map_region(ring_fd, sq_len, IORING_OFF_SQ_RING).inspect_err(|_| {
108 unsafe { ffi::close(ring_fd) };
110 })?;
111 let cq_mmap = Self::map_region(ring_fd, cq_len, IORING_OFF_CQ_RING).inspect_err(|_| {
112 unsafe {
114 ffi::munmap(sq_mmap, sq_len);
115 ffi::close(ring_fd);
116 }
117 })?;
118 let sqes_map = Self::map_region(ring_fd, sqes_len, IORING_OFF_SQES).inspect_err(|_| {
119 unsafe {
121 ffi::munmap(cq_mmap, cq_len);
122 ffi::munmap(sq_mmap, sq_len);
123 ffi::close(ring_fd);
124 }
125 })?;
126 Ok((sq_mmap, cq_mmap, sqes_map))
127 }
128
129 fn setup_ring(entries: u32) -> io::Result<(c_int, IoUringParams)> {
131 let mut p = IoUringParams::default();
132 let fd = unsafe {
134 ffi::syscall(
135 SYS_IO_URING_SETUP,
136 entries as c_long,
137 &mut p as *mut IoUringParams,
138 )
139 };
140 if fd < 0 {
141 return Err(io::Error::last_os_error());
142 }
143 Ok((fd as c_int, p))
144 }
145
146 fn region_sizes(p: &IoUringParams) -> (usize, usize, usize) {
148 let sq_len = (p.sq_off.array as usize) + (p.sq_entries as usize) * 4;
149 let cq_len =
150 (p.cq_off.cqes as usize) + (p.cq_entries as usize) * core::mem::size_of::<Completion>();
151 let sqes_len = (p.sq_entries as usize) * core::mem::size_of::<IoUringSqe>();
152 (sq_len, cq_len, sqes_len)
153 }
154
155 fn map_region(ring_fd: c_int, len: usize, off: i64) -> io::Result<*mut c_void> {
157 let m = unsafe {
160 ffi::mmap(
161 ptr::null_mut(),
162 len,
163 PROT_READ | PROT_WRITE,
164 MAP_SHARED | MAP_POPULATE,
165 ring_fd,
166 off,
167 )
168 };
169 if m as isize == -1 {
170 return Err(io::Error::last_os_error());
171 }
172 Ok(m)
173 }
174
175 unsafe fn sq_cursors(sq_mmap: *mut c_void, p: &IoUringParams) -> SqCursors {
182 let base = sq_mmap as usize;
183 let at = |off: u32| (base + off as usize) as *const AtomicU32;
184 let khead = at(p.sq_off.head);
185 let ktail = at(p.sq_off.tail);
186 let array = (base + p.sq_off.array as usize) as *mut u32;
187 let mask = unsafe { *((base + p.sq_off.ring_mask as usize) as *const u32) };
189 let tail = unsafe { (*ktail).load(Ordering::Acquire) };
192 SqCursors { khead, ktail, array, mask, tail }
193 }
194
195 unsafe fn cq_cursors(cq_mmap: *mut c_void, p: &IoUringParams) -> CqCursors {
201 let base = cq_mmap as usize;
202 let at = |off: u32| (base + off as usize) as *const AtomicU32;
203 let khead = at(p.cq_off.head);
204 let ktail = at(p.cq_off.tail);
205 let cqes = (base + p.cq_off.cqes as usize) as *const Completion;
206 let mask = unsafe { *((base + p.cq_off.ring_mask as usize) as *const u32) };
208 CqCursors { khead, ktail, cqes, mask }
209 }
210
211 fn reserve(&mut self) -> Option<usize> {
214 let khead = unsafe { (*self.sq_khead).load(Ordering::Acquire) };
216 if self.sq_tail.wrapping_sub(khead) >= self.sq_entries {
217 return None; }
219 let idx = (self.sq_tail & self.sq_mask) as usize;
220 unsafe { *self.sq_array.add(idx) = idx as u32 };
223 self.sq_tail = self.sq_tail.wrapping_add(1);
224 Some(idx)
225 }
226
227 pub unsafe fn prep_read(&mut self, fd: i32, buf: *mut u8, len: u32, user_data: u64) -> bool {
234 let Some(idx) = self.reserve() else {
235 return false;
236 };
237 unsafe {
239 ptr::write(
240 self.sqes.add(idx),
241 IoUringSqe::new(IORING_OP_READ, fd, buf as u64, len, user_data),
242 );
243 }
244 true
245 }
246
247 pub unsafe fn prep_write(&mut self, fd: i32, buf: *const u8, len: u32, user_data: u64) -> bool {
254 let Some(idx) = self.reserve() else {
255 return false;
256 };
257 unsafe {
259 ptr::write(
260 self.sqes.add(idx),
261 IoUringSqe::new(IORING_OP_WRITE, fd, buf as u64, len, user_data),
262 );
263 }
264 true
265 }
266
267 pub fn prep_recv_multishot(&mut self, fd: i32, bgid: u16, user_data: u64) -> bool {
274 let Some(idx) = self.reserve() else {
275 return false;
276 };
277 unsafe {
279 let sqe = self.sqes.add(idx);
280 ptr::write(sqe, IoUringSqe::new(IORING_OP_RECV, fd, 0, 0, user_data));
282 (*sqe).ioprio = IORING_RECV_MULTISHOT;
283 (*sqe).flags = IOSQE_BUFFER_SELECT;
284 (*sqe).buf_index = bgid;
286 }
287 true
288 }
289
290 pub fn prep_accept(&mut self, listen_fd: i32, user_data: u64) -> bool {
294 let Some(idx) = self.reserve() else {
295 return false;
296 };
297 unsafe {
299 let sqe = self.sqes.add(idx);
300 ptr::write(
301 sqe,
302 IoUringSqe::new(IORING_OP_ACCEPT, listen_fd, 0, 0, user_data),
303 );
304 (*sqe).rw_flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
305 }
306 true
307 }
308
309 pub fn prep_nop(&mut self, user_data: u64) -> bool {
312 let Some(idx) = self.reserve() else {
313 return false;
314 };
315 unsafe {
317 ptr::write(
318 self.sqes.add(idx),
319 IoUringSqe::new(IORING_OP_NOP, -1, 0, 0, user_data),
320 );
321 }
322 true
323 }
324
325 pub fn submit_and_wait(&mut self, wait_nr: u32) -> io::Result<u32> {
328 let prev = unsafe { (*self.sq_ktail).load(Ordering::Relaxed) };
330 let to_submit = self.sq_tail.wrapping_sub(prev);
331 unsafe { (*self.sq_ktail).store(self.sq_tail, Ordering::Release) };
333 let flags = if wait_nr > 0 { IORING_ENTER_GETEVENTS } else { 0 };
334 let ret = unsafe {
336 ffi::syscall(
337 SYS_IO_URING_ENTER,
338 self.ring_fd as c_long,
339 to_submit as c_long,
340 wait_nr as c_long,
341 flags as c_long,
342 ptr::null::<c_void>(),
343 0usize,
344 )
345 };
346 if ret < 0 {
347 return Err(io::Error::last_os_error());
348 }
349 Ok(ret as u32)
350 }
351
352 pub fn for_each_completion<F: FnMut(Completion)>(&mut self, mut f: F) -> u32 {
354 let mut head = unsafe { (*self.cq_khead).load(Ordering::Relaxed) };
356 let tail = unsafe { (*self.cq_ktail).load(Ordering::Acquire) };
357 let mut n = 0;
358 while head != tail {
359 let idx = (head & self.cq_mask) as usize;
360 let cqe = unsafe { *self.cqes.add(idx) };
362 f(cqe);
363 head = head.wrapping_add(1);
364 n += 1;
365 }
366 unsafe { (*self.cq_khead).store(head, Ordering::Release) };
368 n
369 }
370
371 pub fn register_buf_ring(
378 &self,
379 entries: u16,
380 buf_size: u32,
381 bgid: u16,
382 ) -> io::Result<ProvidedBufRing> {
383 ProvidedBufRing::new(self.ring_fd, entries, buf_size, bgid)
384 }
385}
386
387impl Drop for IoUring {
388 fn drop(&mut self) {
389 unsafe {
391 ffi::munmap(self.sqes as *mut c_void, self.sqes_len);
392 ffi::munmap(self.cq_mmap, self.cq_mmap_len);
393 ffi::munmap(self.sq_mmap, self.sq_mmap_len);
394 ffi::close(self.ring_fd);
395 }
396 }
397}