1use crate as linux_raw;
2use core::sync::atomic::{AtomicU32, Ordering};
3use core::time::Duration;
4
5pub struct IoUring {
6 submit_head_pointer: *const AtomicU32,
7 submit_tail_pointer: *const AtomicU32,
8 submit_head: u32,
9 submit_tail: u32,
10 submit_ring_mask: u32,
11 submit_capacity: u32,
12
13 cqes: *const linux_raw::io_uring_cqe,
14 completion_head_pointer: *const AtomicU32,
15 completion_tail_pointer: *const AtomicU32,
16 completion_head: u32,
17 completion_tail: u32,
18 completion_ring_mask: u32,
19
20 fd: linux_raw::Fd,
21 _ring_map: linux_raw::Mmap,
22 sqes_map: linux_raw::Mmap,
23}
24
25unsafe impl Send for IoUring {}
26unsafe impl Sync for IoUring {}
27
28impl IoUring {
29 pub fn new(queue_size: u32) -> Result<Self, linux_raw::Error> {
30 let mut params = linux_raw::io_uring_params::default();
31 let fd = linux_raw::sys_io_uring_setup(queue_size, &mut params)?;
32 let sring_size = params.sq_off.array + params.sq_entries * core::mem::size_of::<u32>() as u32;
33 let cring_size = params.cq_off.cqes + params.cq_entries * core::mem::size_of::<linux_raw::io_uring_cqe>() as u32;
34 let ring_size = core::cmp::max(sring_size, cring_size);
35 let ring_map = unsafe {
36 linux_raw::Mmap::map(
37 core::ptr::null_mut(),
38 ring_size as usize,
39 linux_raw::PROT_READ | linux_raw::PROT_WRITE,
40 linux_raw::MAP_SHARED | linux_raw::MAP_POPULATE,
41 Some(fd.borrow()),
42 u64::from(linux_raw::IORING_OFF_SQ_RING),
43 )?
44 };
45
46 let sqes_map = unsafe {
47 linux_raw::Mmap::map(
48 core::ptr::null_mut(),
49 params.sq_entries as usize * core::mem::size_of::<linux_raw::io_uring_sqe>(),
50 linux_raw::PROT_READ | linux_raw::PROT_WRITE,
51 linux_raw::MAP_SHARED | linux_raw::MAP_POPULATE,
52 Some(fd.borrow()),
53 u64::from(linux_raw::IORING_OFF_SQES),
54 )?
55 };
56
57 let submit_head_pointer = unsafe { ring_map.as_ptr().byte_add(params.sq_off.head as usize).cast::<AtomicU32>() };
58 let submit_tail_pointer = unsafe { ring_map.as_ptr().byte_add(params.sq_off.tail as usize).cast::<AtomicU32>() };
59 let submit_capacity = unsafe { *ring_map.as_ptr().byte_add(params.sq_off.ring_entries as usize).cast::<u32>() };
60 let array = unsafe { ring_map.as_mut_ptr().byte_add(params.sq_off.array as usize).cast::<u32>() };
61 for index in 0..submit_capacity {
62 unsafe { array.add(index as usize).write(index) };
63 }
64
65 let completion_head_pointer = unsafe { ring_map.as_ptr().byte_add(params.cq_off.head as usize).cast::<AtomicU32>() };
66 let completion_tail_pointer = unsafe { ring_map.as_ptr().byte_add(params.cq_off.tail as usize).cast::<AtomicU32>() };
67
68 Ok(IoUring {
69 submit_head_pointer,
70 submit_tail_pointer,
71 submit_head: unsafe { (*submit_head_pointer).load(Ordering::Acquire) },
72 submit_tail: unsafe { submit_tail_pointer.cast::<u32>().read() },
73 submit_ring_mask: unsafe { *ring_map.as_ptr().byte_add(params.sq_off.ring_mask as usize).cast::<u32>() },
74 submit_capacity,
75
76 cqes: unsafe {
77 ring_map
78 .as_ptr()
79 .byte_add(params.cq_off.cqes as usize)
80 .cast::<linux_raw::io_uring_cqe>()
81 },
82 completion_head_pointer,
83 completion_tail_pointer,
84 completion_head: unsafe { completion_head_pointer.cast::<u32>().read() },
85 completion_tail: unsafe { (*completion_tail_pointer).load(Ordering::Acquire) },
86 completion_ring_mask: unsafe { *ring_map.as_ptr().byte_add(params.cq_off.ring_mask as usize).cast::<u32>() },
87
88 fd,
89 _ring_map: ring_map,
90 sqes_map,
91 })
92 }
93
94 fn append_sqe(&mut self, sqe: linux_raw::io_uring_sqe) {
95 let index = self.submit_tail & self.submit_ring_mask;
96
97 unsafe {
98 self.sqes_map
99 .as_mut_ptr()
100 .cast::<linux_raw::io_uring_sqe>()
101 .add(index as usize)
102 .write(sqe)
103 };
104 self.submit_tail = self.submit_tail.wrapping_add(1);
105 }
106
107 #[allow(clippy::field_reassign_with_default)]
108 pub fn queue_read(&mut self, user_data: u64, fd: linux_raw::FdRef, buffer: *mut u8, length: u32) -> Result<(), linux_raw::Error> {
109 if self.queue_length() >= self.queue_capacity() {
110 return Err(linux_raw::Error::from("no remaining capacity in the io_uring submission queue"));
111 }
112
113 let mut sqe = linux_raw::io_uring_sqe::default();
114 sqe.opcode = linux_raw::io_uring_op_IORING_OP_READ as u8;
115 sqe.fd = fd.raw();
116 sqe.__bindgen_anon_2.addr = buffer as u64;
117 sqe.len = length;
118 sqe.__bindgen_anon_1.off = u64::MAX;
119 sqe.user_data = user_data;
120 self.append_sqe(sqe);
121
122 Ok(())
123 }
124
125 #[allow(clippy::field_reassign_with_default)]
126 pub fn queue_timeout(&mut self, user_data: u64, event_count: u32, duration: Duration) -> Result<(), linux_raw::Error> {
127 if self.queue_length() >= self.queue_capacity() {
128 return Err(linux_raw::Error::from("no remaining capacity in the io_uring submission queue"));
129 }
130
131 let mut sqe = linux_raw::io_uring_sqe::default();
132 sqe.opcode = linux_raw::io_uring_op_IORING_OP_TIMEOUT as u8;
133 sqe.fd = -1;
134 let timespec = linux_raw::timespec {
135 tv_sec: duration.as_secs() as i64,
136 tv_nsec: i64::from(duration.subsec_nanos()),
137 };
138 sqe.__bindgen_anon_2.addr = core::ptr::addr_of!(timespec) as u64;
139 sqe.len = event_count;
140 sqe.__bindgen_anon_1.off = 0;
141 sqe.user_data = user_data;
142 self.append_sqe(sqe);
143
144 Ok(())
145 }
146
147 #[allow(clippy::field_reassign_with_default)]
149 pub fn queue_futex_wait(&mut self, user_data: u64, futex: *const AtomicU32, expected_value: u32) -> Result<(), linux_raw::Error> {
150 if self.queue_length() >= self.queue_capacity() {
151 return Err(linux_raw::Error::from("no remaining capacity in the io_uring submission queue"));
152 }
153
154 let mut sqe = linux_raw::io_uring_sqe::default();
155 sqe.opcode = linux_raw::io_uring_op_IORING_OP_FUTEX_WAIT as u8;
156 sqe.__bindgen_anon_2.addr = futex as usize as u64;
157 sqe.__bindgen_anon_1.addr2 = u64::from(expected_value);
158 unsafe { sqe.__bindgen_anon_6.__bindgen_anon_1.as_mut().addr3 = u64::from(linux_raw::FUTEX_BITSET_MATCH_ANY) };
159 sqe.user_data = user_data;
160 sqe.fd = linux_raw::FUTEX2_SIZE_U32 as i32;
161 self.append_sqe(sqe);
162
163 Ok(())
164 }
165
166 #[allow(clippy::field_reassign_with_default)]
168 pub fn queue_futex_wake_one(&mut self, user_data: u64, futex: *const AtomicU32) -> Result<(), linux_raw::Error> {
169 if self.queue_length() >= self.queue_capacity() {
170 return Err(linux_raw::Error::from("no remaining capacity in the io_uring submission queue"));
171 }
172
173 let mut sqe = linux_raw::io_uring_sqe::default();
174 sqe.opcode = linux_raw::io_uring_op_IORING_OP_FUTEX_WAKE as u8;
175 sqe.__bindgen_anon_2.addr = futex as usize as u64;
176 sqe.__bindgen_anon_1.addr2 = 1;
177 unsafe { sqe.__bindgen_anon_6.__bindgen_anon_1.as_mut().addr3 = u64::from(linux_raw::FUTEX_BITSET_MATCH_ANY) };
178 sqe.user_data = user_data;
179 sqe.fd = linux_raw::FUTEX2_SIZE_U32 as i32;
180 self.append_sqe(sqe);
181
182 Ok(())
183 }
184
185 #[allow(clippy::field_reassign_with_default)]
187 pub fn queue_waitid(
188 &mut self,
189 user_data: u64,
190 id_type: u32,
191 id: u32,
192 info: *mut crate::siginfo_t,
193 options: u32,
194 ) -> Result<(), linux_raw::Error> {
195 if self.queue_length() >= self.queue_capacity() {
196 return Err(linux_raw::Error::from("no remaining capacity in the io_uring submission queue"));
197 }
198
199 let mut sqe = linux_raw::io_uring_sqe::default();
200 sqe.user_data = user_data;
201 sqe.opcode = linux_raw::io_uring_op_IORING_OP_WAITID as u8;
202 sqe.fd = id as i32;
203 sqe.len = id_type;
204 sqe.__bindgen_anon_5.file_index = options;
205 sqe.__bindgen_anon_1.addr2 = info as u64;
206 self.append_sqe(sqe);
207
208 Ok(())
209 }
210
211 pub fn queue_length(&self) -> usize {
212 self.submit_tail.wrapping_sub(self.submit_head) as usize
213 }
214
215 pub fn queue_capacity(&self) -> usize {
216 self.submit_capacity as usize
217 }
218
219 pub unsafe fn submit_and_wait(&mut self, min_complete: u32) -> Result<(), linux_raw::Error> {
220 let count = self.queue_length() as u32;
221 if count == 0 && min_complete == 0 {
222 return Ok(());
223 }
224
225 (*self.submit_tail_pointer).store(self.submit_tail, Ordering::Release);
226 (*self.completion_head_pointer).store(self.completion_head, Ordering::Release);
227 let result = linux_raw::sys_io_uring_enter(
228 self.fd.borrow(),
229 count,
230 min_complete,
231 linux_raw::IORING_ENTER_GETEVENTS,
232 core::ptr::null_mut(),
233 0,
234 );
235 self.submit_head = (*self.submit_head_pointer).load(Ordering::Acquire);
236 self.completion_tail = (*self.completion_tail_pointer).load(Ordering::Acquire);
237
238 result?;
239 Ok(())
240 }
241
242 pub fn finished_count(&self) -> usize {
243 self.completion_tail.wrapping_sub(self.completion_head) as usize
244 }
245
246 pub fn pop_finished(&mut self) -> Option<linux_raw::io_uring_cqe> {
247 if self.finished_count() == 0 {
248 return None;
249 }
250
251 let index = self.completion_head & self.completion_ring_mask;
252 let event = unsafe { self.cqes.add(index as usize).read() };
253 self.completion_head = self.completion_head.wrapping_add(1);
254 Some(event)
255 }
256
257 pub fn cancel_all_sync(&mut self) -> Result<(), linux_raw::Error> {
258 let reg = linux_raw::io_uring_sync_cancel_reg {
259 addr: 0,
260 fd: 0,
261 flags: linux_raw::IORING_ASYNC_CANCEL_ANY,
262 timeout: linux_raw::arch_bindings::__kernel_timespec { tv_sec: !0, tv_nsec: !0 },
263 opcode: 0,
264 pad: Default::default(),
265 pad2: Default::default(),
266 };
267
268 linux_raw::sys_io_uring_register(
269 self.fd.borrow(),
270 linux_raw::IORING_REGISTER_SYNC_CANCEL,
271 core::ptr::addr_of!(reg).cast(),
272 1,
273 )
274 }
275}
276
277impl linux_raw::io_uring_cqe {
278 pub fn to_result(&self) -> Result<i32, linux_raw::Error> {
279 linux_raw::Error::from_syscall("io_uring", i64::from(self.res))?;
280 Ok(self.res)
281 }
282}
283
284#[allow(clippy::unwrap_used)]
285#[test]
286fn test_io_uring_read() {
287 let mut io_uring = IoUring::new(2).unwrap();
288 let fd = crate::sys_open(crate::cstr!("/dev/zero"), crate::O_CLOEXEC).unwrap();
289
290 let mut buffer = [0xff; 5];
291 assert_eq!(io_uring.queue_length(), 0);
292 io_uring.queue_read(0x1234, fd.borrow(), buffer.as_mut_ptr(), 1).unwrap();
293 assert_eq!(io_uring.queue_length(), 1);
294 io_uring.queue_read(0x1235, fd.borrow(), buffer[2..].as_mut_ptr(), 2).unwrap();
295 assert_eq!(io_uring.queue_length(), 2);
296 assert_eq!(io_uring.finished_count(), 0);
297 unsafe {
298 io_uring.submit_and_wait(2).unwrap();
299 }
300 assert_eq!(io_uring.queue_length(), 0);
301 assert_eq!(io_uring.finished_count(), 2);
302 assert_eq!(buffer, [0, 0xff, 0, 0, 0xff]);
303
304 let mut completion_1 = io_uring.pop_finished().unwrap();
305 let mut completion_2 = io_uring.pop_finished().unwrap();
306 let completion_3 = io_uring.pop_finished();
307 assert!(completion_3.is_none());
308
309 if completion_1.user_data == 0x1235 {
310 core::mem::swap(&mut completion_1, &mut completion_2);
311 }
312
313 assert_eq!(completion_1.user_data, 0x1234);
314 assert_eq!(completion_1.res, 1);
315 assert_eq!(completion_2.user_data, 0x1235);
316 assert_eq!(completion_2.res, 2);
317}
318
319#[cfg(test)]
320fn get_kernel_version() -> Result<(u32, u32), linux_raw::Error> {
321 let uname = crate::sys_uname()?;
322 let Ok(release) = unsafe { core::ffi::CStr::from_ptr(uname.release.as_ptr().cast()) }.to_str() else {
323 return Err(linux_raw::Error::from_str(
324 "failed to parse the kernel's release string: not valid UTF-8",
325 ));
326 };
327
328 let mut iter = release.split('.');
329 let Some(major) = iter.next().and_then(|major| major.parse::<u32>().ok()) else {
330 return Err(linux_raw::Error::from_str("failed to extract the kernel's major version"));
331 };
332
333 let Some(minor) = iter.next().and_then(|minor| minor.parse::<u32>().ok()) else {
334 return Err(linux_raw::Error::from_str("failed to extract the kernel's minor version"));
335 };
336
337 Ok((major, minor))
338}
339
340#[allow(clippy::unwrap_used)]
341#[test]
342fn test_io_uring_futex_wait() {
343 extern crate std;
344 use std::sync::Arc;
345
346 let (major, minor) = get_kernel_version().unwrap();
348 if !(major > 6 || (major == 6 && minor > 7)) {
349 return;
350 }
351
352 let futex = Arc::new(AtomicU32::new(0));
353 let mut io_uring = IoUring::new(2).unwrap();
354 io_uring.queue_futex_wait(0x1234, &*futex, 0).unwrap();
355
356 let futex_clone = Arc::clone(&futex);
357 std::thread::spawn(move || {
358 std::thread::sleep(core::time::Duration::from_millis(25));
359 futex_clone.store(1, Ordering::Relaxed);
360 crate::sys_futex_wake_one(&futex_clone).unwrap();
361 });
362
363 unsafe {
364 io_uring.submit_and_wait(1).unwrap();
365 }
366 let completion = io_uring.pop_finished().unwrap();
367 completion.to_result().unwrap();
368 assert_eq!(futex.load(Ordering::Relaxed), 1);
369}
370
371#[allow(clippy::unwrap_used)]
372#[test]
373fn test_io_uring_futex_wake() {
374 extern crate std;
375 use std::sync::Arc;
376
377 let (major, minor) = get_kernel_version().unwrap();
379 if !(major > 6 || (major == 6 && minor > 7)) {
380 return;
381 }
382
383 let futex = Arc::new(AtomicU32::new(0));
384 let futex_clone = Arc::clone(&futex);
385 std::thread::spawn(move || {
386 std::thread::sleep(core::time::Duration::from_millis(25));
387 futex_clone.store(1, Ordering::Relaxed);
388 let mut io_uring = IoUring::new(2).unwrap();
389 io_uring.queue_futex_wake_one(0x1234, &*futex_clone).unwrap();
390 unsafe {
391 io_uring.submit_and_wait(1).unwrap();
392 }
393 let completion = io_uring.pop_finished().unwrap();
394 completion.to_result().unwrap();
395 });
396
397 linux_raw::sys_futex_wait(&futex, 0, Some(core::time::Duration::from_secs(1))).unwrap();
398 assert_eq!(futex.load(Ordering::Relaxed), 1);
399}